Real-time augmented reality (AR) using Python-based Computer Vision

Hi Team,

Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?


Demo

Architecture:

Let us understand the architecture –

Process Flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install pygame

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will embed the source ####
#### video with the WebCAM streams in ####
#### real-time. ####
##################################################
# Importing necessary packages
import numpy as np
import cv2
from clsConfig import clsConfig as cf
# Initialize our cached reference points
CACHED_REF_PTS = None
class clsAugmentedReality:
def __init__(self):
self.TOP_LEFT_X = int(cf.conf['TOP_LEFT_X'])
self.TOP_LEFT_Y = int(cf.conf['TOP_LEFT_Y'])
self.TOP_RIGHT_X = int(cf.conf['TOP_RIGHT_X'])
self.TOP_RIGHT_Y = int(cf.conf['TOP_RIGHT_Y'])
self.BOTTOM_RIGHT_X = int(cf.conf['BOTTOM_RIGHT_X'])
self.BOTTOM_RIGHT_Y = int(cf.conf['BOTTOM_RIGHT_Y'])
self.BOTTOM_LEFT_X = int(cf.conf['BOTTOM_LEFT_X'])
self.BOTTOM_LEFT_Y = int(cf.conf['BOTTOM_LEFT_Y'])
def getWarpImages(self, frame, source, cornerIDs, arucoDict, arucoParams, zoomFlag, useCache=False):
try:
# Assigning values
TOP_LEFT_X = self.TOP_LEFT_X
TOP_LEFT_Y = self.TOP_LEFT_Y
TOP_RIGHT_X = self.TOP_RIGHT_X
TOP_RIGHT_Y = self.TOP_RIGHT_Y
BOTTOM_RIGHT_X = self.BOTTOM_RIGHT_X
BOTTOM_RIGHT_Y = self.BOTTOM_RIGHT_Y
BOTTOM_LEFT_X = self.BOTTOM_LEFT_X
BOTTOM_LEFT_Y = self.BOTTOM_LEFT_Y
# Grab a reference to our cached reference points
global CACHED_REF_PTS
if source is None:
raise
# Grab the width and height of the frame and source image,
# respectively
# Extracting Frame from Camera
# Exracting Source from Video
(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]
# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
print('Ids: ', str(ids))
print('Rejected: ', str(rejected))
# if we *did not* find our four ArUco markers, initialize an
# empty IDs list, otherwise flatten the ID list
print('Detecting Corners: ', str(len(corners)))
ids = np.array([]) if len(corners) != 4 else ids.flatten()
# Initialize our list of reference points
refPts = []
refPtTL1 = []
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]TOP_LEFT_Y
refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))
refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y
refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))
refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y
refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))
refPtTD1_R_X = refPtTL[3][0]BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y
refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))
dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)
# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)
# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)
# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
# mask (giving more weight to the input where there
# are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")
# Return the output frame to the calling function
return output
except Exception as e:
# Delibarately raising the issue
# That way the control goes to main calling methods
# exception section
raise

Please find the key snippet from the above script –

(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]

# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)

Identifying the Aruco markers are key here. The above lines help the program detect all four corners.

However, let us discuss more on the Aruco markers & strategies that I’ve used for several different surfaces.

As you can see, the right-hand side Aruco marker is tiny compared to the left one. Hence, that one will be ideal for a curve surface like Coffee Mug, Bottle rather than a flat surface.

Also, we’ve demonstrated the zoom capability with the smaller Aruco marker that will Augment almost double the original surface area.

Let us understand why we need that; as you know, any spherical surface like a bottle is round-shaped. Hence, detecting relatively more significant Aruco markers in four corners will be difficult for any camera to identify.

Hence, we need a process where close four corners can be extrapolated mathematically to relatively larger projected areas easily detectable by any WebCAM.

Let’s observe the following figure –

Simulated Extrapolated corners

As you can see that the original position of the four corners is represented using the following points, i.e., (x1, y1), (x2, y2), (x3, y3) & (x4, y4).

And these positions are very close to each other. Hence, it will be easier for the camera to detect all the points (like a plain surface) without many retries.

And later, you can add specific values of x & y to them to get the derived four corners as shown in the above figures through the following points, i.e. (x1.1, y1.1), (x2.1, y2.1), (x3.1, y3.1) & (x4.1, y4.1).

# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
  # Grab the index of the corner with the current ID
  j = np.squeeze(np.where(ids == i))

  # If we receive an empty list instead of an integer index,
  # then we could not find the marker with the current ID
  if j.size == 0:
    continue

  # Otherwise, append the corner (x, y)-coordinates to our list
  # of reference points
  corner = np.squeeze(corners[j])
  refPts.append(corner)

# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
  # If we are allowed to use cached reference points, fall
  # back on them
  if useCache and CACHED_REF_PTS is not None:
    refPts = CACHED_REF_PTS

  # Otherwise, we cannot use the cache and/or there are no
  # previous cached reference points, so return early
  else:
    return None

# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
  CACHED_REF_PTS = refPts

# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)

In the above snippet, the application will scan through all the points & try to detect Aruco markers & then create a list of reference points, which will later be used to define the destination transformation matrix.

# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]-TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]-TOP_LEFT_Y

refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))

refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y

refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))

refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y

refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))

refPtTD1_R_X = refPtTL[3][0]-BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y

refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))

dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)

The above snippets calculate the revised points for the zoom-out capabilities as discussed in one of the earlier figures.

# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])

The above snippet will create a transformation matrix for the video trailer.

# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
  (H, _) = cv2.findHomography(srcMat, dstMat)
else:
  (H, _) = cv2.findHomography(srcMat, dstMatMod)

warped = cv2.warpPerspective(source, H, (imgW, imgH))

# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
  cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
  cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)

# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)

# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)

# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
#     mask (giving more weight to the input where there
#     are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 - maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")

Finally, depending upon the zoom flag, the application will create a warped image surrounded by an optionally black border.

  • clsEmbedVideoWithStream.py (This is the main class of python script that will invoke the clsAugmentedReality class to initiate augment reality after splitting the audio & video & then project them via the Web-CAM with a seamless broadcast.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### clsAugmentedReality class to initiate ####
#### augment reality after splitting the ####
#### audio & video & then project them via ####
#### the Web-CAM with a seamless broadcast. ####
##################################################
# Importing necessary packages
import clsAugmentedReality as ar
from clsConfig import clsConfig as cf
from imutils.video import VideoStream
from collections import deque
import imutils
import time
import cv2
import subprocess
import os
import pygame
import time
import threading
import sys
###############################################
### Global Section ###
###############################################
# Instantiating the dependant class
x1 = ar.clsAugmentedReality()
###############################################
### End of Global Section ###
###############################################
class BreakLoop(Exception):
pass
class clsEmbedVideoWithStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.FileName = str(cf.conf['FILE_NAME'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.FileName_1 = str(cf.conf['FILE_NAME_1'])
self.audioLen = int(cf.conf['audioLen'])
self.audioFreq = float(cf.conf['audioFreq'])
self.videoFrame = float(cf.conf['videoFrame'])
self.stopFlag=cf.conf['stopFlag']
self.zFlag=int(cf.conf['zoomFlag'])
self.title = str(cf.conf['TITLE'])
def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
try:
pygame.mixer.init()
pygame.init()
pygame.mixer.music.load(audioFile)
pygame.mixer.music.set_volume(10)
val = int(audioLen)
i = 0
while i < val:
pygame.mixer.music.play(loops=0, start=float(i))
time.sleep(freq)
i = i + 1
if (i >= val):
raise BreakLoop
if (stopFlag==True):
raise BreakLoop
return 0
except BreakLoop as s:
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def extractAudio(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
FileName = self.FileName
CacheL = self.CacheL
FileName_1 = self.FileName_1
audioLen = self.audioLen
audioFreq = self.audioFreq
videoFrame = self.videoFrame
stopFlag = self.stopFlag
zFlag = self.zFlag
title = self.title
print('audioFreq:')
print(str(audioFreq))
print('videoFrame:')
print(str(videoFrame))
# Construct the source for Video & Temporary Audio
videoFile = Curr_Path + sep + 'Video' + sep + FileName
audioFile = Curr_Path + sep + 'Video' + sep + FileName_1
# Load the Aruco dictionary and grab the Aruco parameters
print("[INFO] initializing marker detector…")
arucoDict = cv2.aruco.Dictionary_get(cv2.aruco.DICT_ARUCO_ORIGINAL)
arucoParams = cv2.aruco.DetectorParameters_create()
# Initialize the video file stream
print("[INFO] accessing video stream…")
vf = cv2.VideoCapture(videoFile)
x = self.extractAudio(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)
# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream…")
vs = VideoStream(src=0).start()
time.sleep(2.0)
flg = 0
t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True
try:
# Loop over the frames from the video stream
while len(Q) > 0:
try:
# Grab the frame from our video stream and resize it
frame = vs.read()
frame = imutils.resize(frame, width=1020)
# Attempt to find the ArUCo markers in the frame, and provided
# they are found, take the current source image and warp it onto
# input frame using our augmented reality technique
warped = x1.getWarpImages(
frame, source,
cornerIDs=(923, 1001, 241, 1007),
arucoDict=arucoDict,
arucoParams=arucoParams,
zoomFlag=zFlag,
useCache=CacheL > 0)
# If the warped frame is not None, then we know (1) we found the
# four ArUCo markers and (2) the perspective warp was successfully
# applied
if warped is not None:
# Set the frame to the output augment reality frame and then
# grab the next video file frame from our queue
frame = warped
source = Q.popleft()
if flg == 0:
t.start()
flg = flg + 1
# For speed/efficiency, we can use a queue to keep the next video
# frame queue ready for us — the trick is to ensure the queue is
# always (or nearly full)
if len(Q) != Q.maxlen:
# Read the next frame from the video file stream
(grabbed, nextFrame) = vf.read()
# If the frame was read (meaning we are not at the end of the
# video file stream), add the frame to our queue
if grabbed:
Q.append(nextFrame)
# Show the output frame
cv2.imshow(title, frame)
time.sleep(videoFrame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(2) & 0xFF == ord('q'):
stopFlag = True
break
except BreakLoop:
raise BreakLoop
except Exception as e:
pass
if (len(Q) == Q.maxlen):
time.sleep(2)
break
except BreakLoop as s:
print('Processed completed!')
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
except Exception as e:
x = str(e)
print(x)
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
return 0
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
  try:
    pygame.mixer.init()
    pygame.init()
    pygame.mixer.music.load(audioFile)

    pygame.mixer.music.set_volume(10)

    val = int(audioLen)
    i = 0

    while i < val:
      pygame.mixer.music.play(loops=0, start=float(i))
      time.sleep(freq)

      i = i + 1

      if (i >= val):
        raise BreakLoop

      if (stopFlag==True):
        raise BreakLoop

    return 0
  except BreakLoop as s:
    return 0
  except Exception as e:
    x = str(e)
    print(x)

    return 1

The above function will initiate the pygame library to run the sound of the video file that has been extracted as part of a separate process.

def extractAudio(self, video_file, output_ext="mp3"):
    try:
        """Converts video to audio directly using `ffmpeg` command
        with the help of subprocess module"""
        filename, ext = os.path.splitext(video_file)
        subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                        stdout=subprocess.DEVNULL,
                        stderr=subprocess.STDOUT)

        return 0
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above function temporarily extracts the audio file from the source trailer video.

# Initialize the video file stream
print("[INFO] accessing video stream...")
vf = cv2.VideoCapture(videoFile)

x = self.extractAudio(videoFile)

if x == 0:
    print('Successfully Audio extracted from the source file!')
else:
    print('Failed to extract the source audio!')

# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)

# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)

# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()

time.sleep(2.0)
flg = 0

The above snippets read the frames from the video file after invoking the audio extraction. Then, it uses a Queue method to store all the video frames for better performance. And finally, it starts consuming the standard streaming video from the WebCAM to augment the trailer video on top of it.

t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True

Now, the application has instantiated an orphan thread to spin off the audio play function. The reason is to void the performance & video frame frequency impact on top of it.

while len(Q) > 0:
  try:
    # Grab the frame from our video stream and resize it
    frame = vs.read()
    frame = imutils.resize(frame, width=1020)

    # Attempt to find the ArUCo markers in the frame, and provided
    # they are found, take the current source image and warp it onto
    # input frame using our augmented reality technique
    warped = x1.getWarpImages(
      frame, source,
      cornerIDs=(923, 1001, 241, 1007),
      arucoDict=arucoDict,
      arucoParams=arucoParams,
      zoomFlag=zFlag,
      useCache=CacheL > 0)

    # If the warped frame is not None, then we know (1) we found the
    # four ArUCo markers and (2) the perspective warp was successfully
    # applied
    if warped is not None:
      # Set the frame to the output augment reality frame and then
      # grab the next video file frame from our queue
      frame = warped
      source = Q.popleft()

      if flg == 0:

        t.start()
        flg = flg + 1

    # For speed/efficiency, we can use a queue to keep the next video
    # frame queue ready for us -- the trick is to ensure the queue is
    # always (or nearly full)
    if len(Q) != Q.maxlen:
      # Read the next frame from the video file stream
      (grabbed, nextFrame) = vf.read()

      # If the frame was read (meaning we are not at the end of the
      # video file stream), add the frame to our queue
      if grabbed:
        Q.append(nextFrame)

    # Show the output frame
    cv2.imshow(title, frame)
    time.sleep(videoFrame)

    # If the `q` key was pressed, break from the loop
    if cv2.waitKey(2) & 0xFF == ord('q'):
      stopFlag = True
      break

  except BreakLoop:
    raise BreakLoop
  except Exception as e:
    pass

  if (len(Q) == Q.maxlen):
    time.sleep(2)
    break

The final segment will call the getWarpImages function to get the Augmented image on top of the video. It also checks for the upcoming frames & whether the source video is finished or not. In case of the end, the application will initiate a break method to come out from the infinite WebCAM read. Also, there is a provision for manual exit by pressing the ‘Q’ from the MacBook keyboard.

# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()

It is always advisable to close your camera & remove any temporarily available windows that are still left once the application finishes the process.

  • augmentedMovieTrailer.py (Main calling script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsEmbedVideoWithStream class to initiate ####
#### the augmented reality in real-time ####
#### & display a trailer on top of any surface ####
#### via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsEmbedVideoWithStream as evws
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = evws.clsEmbedVideoWithStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'augmentedMovieTrailer.log', level=logging.INFO)
print('Started augmenting videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully identified human emotions!')
else:
print('Failed to identify the human emotions!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above script will initially instantiate the main calling class & then invoke the processStream function to create the Augmented Reality.


FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

Directory Structure

You will get the complete codebase in the following Github link.

If you want to know more about this legendary director & his famous work, please visit the following link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Live visual reading using Convolutional Neural Network (CNN) through Python-based machine-learning application.

This week we’re planning to touch on one of the exciting posts of visually reading characters from WebCAM & predict the letters using CNN methods. Before we dig deep, why don’t we see the demo run first?

Demo

Isn’t it fascinating? As we can see, the computer can record events and read like humans. And, thanks to the brilliant packages available in Python, which can help us predict the correct letter out of an Image.


What do we need to test it out?

  1. Preferably an external WebCAM.
  2. A moderate or good Laptop to test out this.
  3. Python 
  4. And a few other packages that we’ll mention next block.

What Python packages do we need?

Some of the critical packages that we must need to test out this application are –

cmake==3.22.1
dlib==19.19.0
face-recognition==1.3.0
face-recognition-models==0.3.0
imutils==0.5.3
jsonschema==4.4.0
keras==2.7.0
Keras-Preprocessing==1.1.2
matplotlib==3.5.1
matplotlib-inline==0.1.3
oauthlib==3.1.1
opencv-contrib-python==4.1.2.30
opencv-contrib-python-headless==4.4.0.46
opencv-python==4.5.5.62
opencv-python-headless==4.5.5.62
pickleshare==0.7.5
Pillow==9.0.0
python-dateutil==2.8.2
requests==2.27.1
requests-oauthlib==1.3.0
scikit-image==0.19.1
scikit-learn==1.0.2
tensorboard==2.7.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.7.0
tensorflow-estimator==2.7.0
tensorflow-io-gcs-filesystem==0.23.1
tqdm==4.62.3

What is CNN?

In deep learning, a convolutional neural network (CNN/ConvNet) is a class of deep neural networks most commonly applied to analyze visual imagery.

Different Steps of CNN

We can understand from the above picture that a CNN generally takes an image as input. The neural network analyzes each pixel separately. The weights and biases of the model are then tweaked to detect the desired letters (In our use case) from the image. Like other algorithms, the data also has to pass through pre-processing stage. However, a CNN needs relatively less pre-processing than most other Deep Learning algorithms.

If you want to know more about this, there is an excellent article on CNN with some on-point animations explaining this concept. Please read it here.

Where do we get the data sets for our testing?

For testing, we are fortunate enough to have Kaggle with us. We have received a wide variety of sample data, which you can get from here.


Our use-case:

Architecture

From the above diagram, one can see that the python application will consume a live video feed of any random letters (both printed & handwritten) & predict the character as part of the machine learning model that we trained.


Code:

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'A_Z_Handwritten_Data.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize'🙁3, 3),
'poolSize'🙁2, 2),
'filterVal1':32,
'filterVal2':64,
'filterVal3':128,
'stridesVal':2,
'monitorVal':'val_loss',
'paddingVal1':'same',
'paddingVal2':'valid',
'reshapeVal':28,
'reshapeVal1'🙁28,28),
'patienceVal1':1,
'patienceVal2':2,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'minDeltaVal':0,
'minLrVal':0.0001,
'verboseFlag':0,
'modeInd':'auto',
'shuffleVal':100,
'DenkseVal1':26,
'DenkseVal2':64,
'DenkseVal3':128,
'predParam':9,
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},
'width':640,
'height':480,
'imgSize': (32,32),
'threshold': 0.45,
'imgDimension': (400, 440),
'imgSmallDim': (7, 7),
'imgMidDim': (28, 28),
'reshapeParam1':1,
'reshapeParam2':28,
'colorFeed'🙁0,0,130),
'colorPredict'🙁0,25,255)
}

view raw

clsConfig.py

hosted with ❤ by GitHub

Important parameters that we need to follow from the above snippets are –

'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize':(3, 3),
'poolSize':(2, 2),
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},

Since we have 26 letters, we have classified it as 26 in the numOfClasses.

Since we are talking about characters, we had to come up with a process of identifying each character as numbers & then processing our entire logic. Hence, the above parameter named word_dict captured all the characters in a python dictionary & stored them. Moreover, the application translates the final number output to more appropriate characters as the prediction.

2. clsAlphabetReading.py (Main training class to teach the model to predict alphabets from visual reader.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
from keras.datasets import mnist
import matplotlib.pyplot as plt
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout
from tensorflow.keras.optimizers import SGD, Adam
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.utils.np_utils import to_categorical
import pandas as p
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook
from sklearn.utils import shuffle
import pickle
import os
import platform as pl
from clsConfig import clsConfig as cf
class clsAlphabetReading:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.fileName = str(cf.conf['FILE_NAME'])
self.testRatio = float(cf.conf['testRatio'])
self.valRatio = float(cf.conf['valRatio'])
self.epochsVal = int(cf.conf['epochsVal'])
self.activationType = str(cf.conf['activationType'])
self.activationType2 = str(cf.conf['activationType2'])
self.numOfClasses = int(cf.conf['numOfClasses'])
self.kernelSize = cf.conf['kernelSize']
self.poolSize = cf.conf['poolSize']
self.filterVal1 = int(cf.conf['filterVal1'])
self.filterVal2 = int(cf.conf['filterVal2'])
self.filterVal3 = int(cf.conf['filterVal3'])
self.stridesVal = int(cf.conf['stridesVal'])
self.monitorVal = str(cf.conf['monitorVal'])
self.paddingVal1 = str(cf.conf['paddingVal1'])
self.paddingVal2 = str(cf.conf['paddingVal2'])
self.reshapeVal = int(cf.conf['reshapeVal'])
self.reshapeVal1 = cf.conf['reshapeVal1']
self.patienceVal1 = int(cf.conf['patienceVal1'])
self.patienceVal2 = int(cf.conf['patienceVal2'])
self.sleepTime = int(cf.conf['sleepTime'])
self.sleepTime1 = int(cf.conf['sleepTime1'])
self.factorVal = float(cf.conf['factorVal'])
self.learningRateVal = float(cf.conf['learningRateVal'])
self.minDeltaVal = int(cf.conf['minDeltaVal'])
self.minLrVal = float(cf.conf['minLrVal'])
self.verboseFlag = int(cf.conf['verboseFlag'])
self.modeInd = str(cf.conf['modeInd'])
self.shuffleVal = int(cf.conf['shuffleVal'])
self.DenkseVal1 = int(cf.conf['DenkseVal1'])
self.DenkseVal2 = int(cf.conf['DenkseVal2'])
self.DenkseVal3 = int(cf.conf['DenkseVal3'])
self.predParam = int(cf.conf['predParam'])
self.word_dict = cf.conf['word_dict']
def applyCNN(self, X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg):
try:
testRatio = self.testRatio
epochsVal = self.epochsVal
activationType = self.activationType
activationType2 = self.activationType2
numOfClasses = self.numOfClasses
kernelSize = self.kernelSize
poolSize = self.poolSize
filterVal1 = self.filterVal1
filterVal2 = self.filterVal2
filterVal3 = self.filterVal3
stridesVal = self.stridesVal
monitorVal = self.monitorVal
paddingVal1 = self.paddingVal1
paddingVal2 = self.paddingVal2
reshapeVal = self.reshapeVal
patienceVal1 = self.patienceVal1
patienceVal2 = self.patienceVal2
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
factorVal = self.factorVal
learningRateVal = self.learningRateVal
minDeltaVal = self.minDeltaVal
minLrVal = self.minLrVal
verboseFlag = self.verboseFlag
modeInd = self.modeInd
shuffleVal = self.shuffleVal
DenkseVal1 = self.DenkseVal1
DenkseVal2 = self.DenkseVal2
DenkseVal3 = self.DenkseVal3
model = Sequential()
model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Flatten())
model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))
model.add(Dense(DenkseVal1,activation = activationType2))
model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)
fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop], validation_data = (X_Validation,Y_Validation_Catg))
return (model, fittedModel)
except Exception as e:
x = str(e)
model = Sequential()
print('Error: ', x)
return (model, model)
def trainModel(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
fileName = self.fileName
epochsVal = self.epochsVal
valRatio = self.valRatio
predParam = self.predParam
testRatio = self.testRatio
reshapeVal = self.reshapeVal
numOfClasses = self.numOfClasses
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
shuffleVal = self.shuffleVal
reshapeVal1 = self.reshapeVal1
# Dictionary for getting characters from index values
word_dict = self.word_dict
print('File Name: ', str(fileName))
# Read the data
df_HW_Alphabet = p.read_csv(fileName).astype('float32')
# Sample Data
print('Sample Data: ')
print(df_HW_Alphabet.head())
# Split data the (x – Our data) & (y – the prdict label)
x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']
# Reshaping the data in csv file to display as an image
X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)
X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))
print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)
# Plotting the number of alphabets in the dataset
Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
count[i] +=1
alphabets = []
for i in word_dict.values():
alphabets.append(i)
fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)
plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()
# Shuffling the data
shuff = shuffle(X_Train[:shuffleVal])
# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)
X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)
X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)
# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)
Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)
Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)
model, history = self.applyCNN(X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg)
print('Model Summary: ')
print(model.summary())
# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])
# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Making the model to predict
pred = model.predict(X_Test[:predParam])
print('Test Details::')
print('X_Test: ', X_Test.shape)
print('Y_Test_Catg: ', Y_Test_Catg.shape)
try:
score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
print('Test Score = ', score[0])
print('Test Accuracy = ', score[1])
except Exception as e:
x = str(e)
print('Error: ', x)
# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()
for i in range(9):
axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
pred = word_dict[np.argmax(Y_Test_Catg[i])]
print('Prediction: ', pred)
axes[i].set_title("Test Prediction: " + pred)
axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets from the above scripts are –

x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']

In the above snippet, we have split the data into images & their corresponding labels.

X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)

X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))


print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)

We are splitting the data into Train, Test & Validation sets to get more accurate predictions and reshaping the raw data into the image by consuming the 784 data columns to 28×28 pixel images.

Since we are talking about characters, we had to come up with a process of identifying The following snippet will plot the character equivalent number into a matplotlib chart & showcase the overall distribution trend after splitting.

Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
    count[i] +=1

alphabets = []
for i in word_dict.values():
    alphabets.append(i)

fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)

plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()

Note that we have tweaked the plt.show property with (block=False). This property will enable us to continue execution without human interventions after the initial pause.

# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)

X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)

X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)

# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)

Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)

Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)

In the above diagram, the application did reshape all three categories of data before calling the primary CNN function.

model = Sequential()

model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Flatten())

model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))

model.add(Dense(DenkseVal1,activation = activationType2))

model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)


fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop],  validation_data = (X_Validation,Y_Validation_Catg))

return (model, fittedModel)

In the above snippet, the convolution layers are followed by maxpool layers, which reduce the number of features extracted. The output of the maxpool layers and convolution layers are flattened into a vector of a single dimension and supplied as an input to the Dense layer—the CNN model prepared for training the model using the training dataset.

We have used optimization parameters like Adam, RMSProp & the application we trained for eight epochs for better accuracy & predictions.

# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])

# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Also, we have captured the validation Accuracy & Loss & plot them into two separate graphs for better understanding.

try:
    score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
    print('Test Score = ', score[0])
    print('Test Accuracy = ', score[1])
except Exception as e:
    x = str(e)
    print('Error: ', x)

Also, the application is trying to get the accuracy of the model that we trained & validated with the training & validation data. This time we have used test data to predict the confidence score.

# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()

for i in range(9):
    axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
    pred = word_dict[np.argmax(Y_Test_Catg[i])]
    print('Prediction: ', pred)
    axes[i].set_title("Test Prediction: " + pred)
    axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Finally, the application testing with some random test data & tried to plot the output & prediction assessment.

Testing with Random Test Data
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()

As a part of the last step, the application will generate the models using a pickle package & save them under a specific location, which the reader application will use.

3. trainingVisualDataRead.py (Main application that will invoke the training class to predict alphabet through WebCam using Convolutional Neural Network (CNN).)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsAlhpabetReading class to initiate ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
# We keep the setup code in a different class as shown below.
import clsAlphabetReading as ar
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = ar.clsAlphabetReading()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.trainModel(debugInd, var)
if (r1 == 0):
print('Successfully Visual Alphabet Training Completed!')
else:
print('Failed to complete the Visual Alphabet Training!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the core snippet from the above script is –

x1 = ar.clsAlphabetReading()

Instantiate the main class.

r1 = x1.trainModel(debugInd, var)

The python application will invoke the class & capture the returned value inside the r1 variable.

4. readingVisualData.py (Reading the model to predict Alphabet using WebCAM.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 18-Jan-2022 ####
#### Modified On 18-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### scan the live video feed from the ####
#### web-cam & predict the alphabet that ####
#### read it. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import cv2
import pickle
import numpy as np
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
epochsVal = int(cf.conf['epochsVal'])
numOfClasses = int(cf.conf['numOfClasses'])
word_dict = cf.conf['word_dict']
width = int(cf.conf['width'])
height = int(cf.conf['height'])
imgSize = cf.conf['imgSize']
threshold = float(cf.conf['threshold'])
imgDimension = cf.conf['imgDimension']
imgSmallDim = cf.conf['imgSmallDim']
imgMidDim = cf.conf['imgMidDim']
reshapeParam1 = int(cf.conf['reshapeParam1'])
reshapeParam2 = int(cf.conf['reshapeParam2'])
colorFeed = cf.conf['colorFeed']
colorPredict = cf.conf['colorPredict']
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Live Streaming!')
cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)
while True:
status, img = cap.read()
if status == False:
break
img_copy = img.copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)
img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)
img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))
img_pred = word_dict[np.argmax(model.predict(img_final))]
# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)
cv2.putText(img, "Live Feed : (" + str(probVal) + "%) ", (20,25), cv2.FONT_HERSHEY_TRIPLEX, 0.7, color = colorFeed)
cv2.putText(img, "Prediction: " + img_pred, (20,410), cv2.FONT_HERSHEY_DUPLEX, 1.3, color = colorPredict)
cv2.imshow("Original Image", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
r1=0
break
if (r1 == 0):
print('Successfully Alphabets predicted!')
else:
print('Failed to predict alphabet!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the key snippet from the above code is –

cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)

The application is reading the live video data from WebCAM. Also, set out the height & width for the video output.

fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)

The application reads the model output generated as part of the previous script using the pickle package.

while True:
    status, img = cap.read()

    if status == False:
        break

The application will read the WebCAM & it exits if there is an end of video transmission or some kind of corrupt video frame.

img_copy = img.copy()

img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)

img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)

img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))


img_pred = word_dict[np.argmax(model.predict(img_final))]

We have initially cloned the original video frame & then it converted from BGR2GRAYSCALE while applying the threshold on it doe better prediction outcomes. Then the image has resized & reshaped for model input. Finally, the np.argmax function extracted the class index with the highest predicted probability. Furthermore, it is translated using the word_dict dictionary to an Alphabet & displayed on top of the Live View.

# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)

Also, derive the confidence score of that probability & display that on top of the Live View.

if cv2.waitKey(1) & 0xFF == ord('q'):
    r1=0
    break

The above code will let the developer exit from this application by pressing the “Esc” or “q”-key from the keyboard & the program will terminate.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality of Alphabet.

Pandas with Encryption/Decryption along with the JSON – (Client API Access) along with Data Queue (A crossover between Space stone, Reality Stone & Power Stone)

Today, we’ll be discussing a new cross-over between API, JSON, Encryption along with data distribution through Queue.

The primary objective here is to distribute one csv file through API service & access our previously deployed Encryption/Decryption methods by accessing the parallel call through Queue. In this case, our primary objective is to allow asynchronous calls to Queue for data distributions & at this point we’re not really looking for performance improvement. Instead, our goal to achieve the target.

My upcoming posts will discuss the improvement of performance using Parallel calls.

Let’s discuss it now.

Please find the structure of our Windows & MAC directory are as follows –

Win_Vs_MAC

We’re not going to discuss any scripts, which we’ve already discussed in my previous posts. Please refer the relevant earlier posts from my blogs.

1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
####                               ########
#### Objective: Log File           ########
###########################################
import pandas as p
import platform as pl
from clsParam import clsParam as cf

class clsL(object):
    def __init__(self):
        self.path = cf.config['PATH']

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df
            sd = subdir

            os_det = pl.system()

            if sd == None:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + Filename
                else:
                    fullFileName = self.path + '/' + Filename
            else:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + sd + "\\" + Filename
                else:
                    fullFileName = self.path + '/' + sd + "/" + Filename

            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

2. callRunServer.py (This script will create an instance of a server. Once, it is running – it will emulate the Server API functionalities. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
#### Also, this will create an instance ####
#### of the server & create an endpoint ####
#### or API using flask framework.      ####
############################################

from flask import Flask
from flask import jsonify
from flask import request
from flask import abort
from clsConfigServer import clsConfigServer as csf
import clsFlask as clf

app = Flask(__name__)

@app.route('/process/getEncrypt', methods=['POST'])
def getEncrypt():
    try:
        # If the server application doesn't have
        # valid json, it will throw 400 error
        if not request.get_json:
            abort(400)

        # Capturing the individual element
        content = request.get_json()

        dGroup = content['dataGroup']
        input_data = content['data']
        dTemplate = content['dataTemplate']

        # For debug purpose only
        print("-" * 157)
        print("Group: ", dGroup)
        print("Data: ", input_data)
        print("Template: ", dTemplate)
        print("-" * 157)

        ret_val = ''

        if ((dGroup != '') & (dTemplate != '')):
            y = clf.clsFlask()
            ret_val = y.getEncryptProcess(dGroup, input_data, dTemplate)
        else:
            abort(500)

        return jsonify({'status': 'success', 'encrypt_val': ret_val})
    except Exception as e:
        x = str(e)
        return jsonify({'status': 'error', 'detail': x})


@app.route('/process/getDecrypt', methods=['POST'])
def getDecrypt():
    try:
        # If the server application doesn't have
        # valid json, it will throw 400 error
        if not request.get_json:
            abort(400)

        # Capturing the individual element
        content = request.get_json()

        dGroup = content['dataGroup']
        input_data = content['data']
        dTemplate = content['dataTemplate']

        # For debug purpose only
        print("-" * 157)
        print("Group: ", dGroup)
        print("Data: ", input_data)
        print("Template: ", dTemplate)
        print("-" * 157)

        ret_val = ''

        if ((dGroup != '') & (dTemplate != '')):
            y = clf.clsFlask()
            ret_val = y.getDecryptProcess(dGroup, input_data, dTemplate)
        else:
            abort(500)

        return jsonify({'status': 'success', 'decrypt_val': ret_val})
    except Exception as e:
        x = str(e)
        return jsonify({'status': 'error', 'detail': x})


def main():
    try:
        print('Starting Encrypt/Decrypt Application!')

        # Calling Server Start-Up Script
        app.run(debug=True, host=str(csf.config['HOST_IP_ADDR']))
        ret_val = 0

        if ret_val == 0:
            print("Finished Returning Message!")
        else:
            raise IOError
    except Exception as e:
        print("Server Failed To Start!")

if __name__ == '__main__':
    main()

 

3. clsFlask.py (This script is part of the server process, which will categorize the encryption logic based on different groups. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 25-Jan-2019           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: This script will       ####
#### encrypt/decrypt based on the      ####
#### supplied salt value. Also,        ####
#### this will capture the individual  ####
#### element & stored them into JSON   ####
#### variables using flask framework.  ####
###########################################

from clsConfigServer import clsConfigServer as csf
import clsEnDecAuth as cen

class clsFlask(object):
    def __init__(self):
        self.xtoken = str(csf.config['DEF_SALT'])

    def getEncryptProcess(self, dGroup, input_data, dTemplate):
        try:
            # It is sending default salt value
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will encrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid json Error Response
            return ret_val

    def getDecryptProcess(self, dGroup, input_data, dTemplate):
        try:
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will decrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid Error Response
            return ret_val

 

4. clsEnDec.py (This script will convert the string to encryption or decryption from its previous states based on the supplied group. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
#### Package Cryptography needs to ########
#### install in order to run this  ########
#### script.                       ########
####                               ########
#### Objective: This script will   ########
#### encrypt/decrypt based on the  ########
#### hidden supplied salt value.   ########
###########################################

from cryptography.fernet import Fernet

class clsEnDec(object):

    def __init__(self, token):
        # Calculating Key
        self.token = token

    def encrypt_str(self, data):
        try:
            # Capturing the Salt Information
            salt = self.token

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            encr_val = str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            return encr_val

        except Exception as e:
            x = str(e)
            print(x)
            encr_val = ''

            return encr_val

    def decrypt_str(self, data):
        try:
            # Capturing the Salt Information
            salt = self.token

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            decr_val = str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            return decr_val

        except Exception as e:
            x = str(e)
            print(x)
            decr_val = ''

            return decr_val

 

5. clsConfigServer.py (This script contains all the main parameter details of your emulated API server. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 10-Feb-2019       ########
####                               ########
#### Objective: Parameter File     ########
###########################################

import os
import platform as pl

# Checking with O/S system
os_det = pl.system()

class clsConfigServer(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    if os_det == "Windows":
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '\\' + 'src_file\\',
            'PROFILE_FILE_PATH': Curr_Path + '\\' + 'profile\\',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }
    else:
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '/' + 'src_file/',
            'PROFILE_FILE_PATH': Curr_Path + '/' + 'profile/',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }

 

6. clsWeb.py (This script will receive the input Pandas dataframe & then convert it to JSON & then send it back to our Flask API Server for encryption/decryption. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 09-Mar-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate API based JSON requests   ####
#### at the server & receive the        ####
#### response from it & transform it    ####
#### back to the data-frame.            ####
############################################

import json
import requests
import datetime
import time
import ssl
import os
from clsParam import clsParam as cf

class clsWeb(object):
    def __init__(self, payload):
        self.payload = payload
        self.path = str(cf.config['PATH'])
        self.max_retries = int(cf.config['MAX_RETRY'])
        self.encrypt_ulr = str(cf.config['ENCRYPT_URL'])
        self.decrypt_ulr = str(cf.config['DECRYPT_URL'])

    def getResponse(self, mode):

        # Assigning Logging Info
        max_retries = self.max_retries
        encrypt_ulr = self.encrypt_ulr
        decrypt_ulr = self.decrypt_ulr
        En_Dec_Mode = mode

        try:

            # Bypassing SSL Authentication
            try:
                _create_unverified_https_context = ssl._create_unverified_context
            except AttributeError:
                # Legacy python that doesn't verify HTTPS certificates by default
                pass
            else:
                # Handle target environment that doesn't support HTTPS verification
                ssl._create_default_https_context = _create_unverified_https_context

            # Providing the url
            if En_Dec_Mode == 'En':
                url = encrypt_ulr
            else:
                url = decrypt_ulr

            print("URL::", url)

            # Capturing the payload
            data = self.payload

            # Converting String to Json
            # json_data = json.loads(data)
            json_data = json.loads(data)

            print("JSON:::::::", str(json_data))

            headers = {"Content-type": "application/json"}
            param = headers

            var1 = datetime.datetime.now().strftime("%H:%M:%S")
            print('Json Fetch Start Time:', var1)

            retries = 1
            success = False

            while not success:
                # Getting response from web service
                # response = requests.post(url, params=param, json=data, auth=(login, password), verify=False)
                response = requests.post(url, params=param, json=json_data, verify=False)
                print("Complete Return Code:: ", str(response.status_code))
                print("Return Code Initial::", str(response.status_code)[:1])

                if str(response.status_code)[:1] == '2':
                    # response = s.post(url, params=param, json=json_data, verify=False)
                    success = True
                else:
                    wait = retries * 2
                    print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
                    time.sleep(wait)
                    retries += 1
                    # print('Return Service::')

                # Checking Maximum Retries
                if retries == max_retries:
                    success = True
                    raise ValueError

                print("JSON RESPONSE:::", response.text)

                var2 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch End Time:', var2)

                # Capturing the response json from Web Service
                response_json = response.text
                load_val = json.loads(response_json)

                # Based on the mode application will send the return value
                if En_Dec_Mode == 'En':
                    encrypt_ele = str(load_val['encrypt_val'])
                    return_ele = encrypt_ele
                else:
                    decrypt_ele = str(load_val['decrypt_val'])
                    return_ele = decrypt_ele

            return return_ele

        except ValueError as v:
            raise ValueError

        except Exception as e:
            x = str(e)
            print(x)

            return 'Error'

Let’s discuss the key lines –

try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    # Legacy python that doesn't verify HTTPS certificates by default
    pass
else:
    # Handle target environment that doesn't support HTTPS verification
    ssl._create_default_https_context = _create_unverified_https_context

If you are running in a secure environment. Sometimes, your proxy or firewall blocks you from accessing the API server – if they are using different networks. Hence, we need to bypass that. However, it is advisable not to use this in Prod environment for obvious reasons.

# Capturing the payload
data = self.payload

# Converting String to Json
json_data = json.loads(data)

This snippet will convert your data frame into a JSON object.

response = requests.post(url, params=param, json=json_data, verify=False)
print("Complete Return Code:: ", str(response.status_code))
print("Return Code Initial::", str(response.status_code)[:1])

if str(response.status_code)[:1] == '2':
    # response = s.post(url, params=param, json=json_data, verify=False)
    success = True
else:
    wait = retries * 2
    print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
    time.sleep(wait)
    retries += 1
    # print('Return Service::')

# Checking Maximum Retries
if retries == max_retries:
    success = True
    raise ValueError

In the first 3 lines, the application is building a JSON response, which will be sent to the API Server. And, it will capture the response from the server.

Next 8 lines will check the status code. And, based on the status code, it will continue or retry the requests in case if there is any failure or lousy response from the server.

Last 3 lines say if the application crosses the maximum allowable error limit, it will terminate the process by raising it as an error.

# Capturing the response json from Web Service
response_json = response.text
load_val = json.loads(response_json)

Once, it receives the valid response, the application will convert it back to the dataframe & send it to the calling methods.

7. clsParam.py (This script contains the fundamental parameter values to run your client application. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import os

class clsParam(object):

    config = {
        'MAX_RETRY' : 5,
        'ENCRYPT_MODE' : 'En',
        'DECRYPT_MODE': 'De',
        'PATH' : os.path.dirname(os.path.realpath(__file__)),
        'SRC_DIR' : os.path.dirname(os.path.realpath(__file__)) + '/' + 'src_files/',
        'FIN_DIR': os.path.dirname(os.path.realpath(__file__)) + '/' + 'finished/',
        'ENCRYPT_URL': "http://192.168.0.13:5000/process/getEncrypt",
        'DECRYPT_URL': "http://192.168.0.13:5000/process/getDecrypt",
        'NUM_OF_THREAD': 20
    }

 

8. clsSerial.py (This script will show the usual or serial way to convert your data into encryption & then to decrypts & store the result into two separate csv files. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data      ####
#### using serial mode operation.       ####
############################################

import pandas as p
import clsWeb as cw
import datetime
from clsParam import clsParam as cf

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsSerial(object):
    def __init__(self):
        self.path = cf.config['PATH']
        self.EncryptMode = str(cf.config['ENCRYPT_MODE'])
        self.DecryptMode = str(cf.config['DECRYPT_MODE'])

    # Lookup Methods for Encryption
    def encrypt_acctNbr(self, row):
        # Declaring Local Variable
        en_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctNbr = x.getResponse(EncryptMode)
        else:
            en_AcctNbr = ''

        fil_acct_nbr = ''
        fil_acct_nbr = ''

        return en_AcctNbr

    def encrypt_Name(self, row):
        # Declaring Local Variable
        en_AcctName = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctName = x.getResponse(EncryptMode)
        else:
            en_AcctName = ''

        return en_AcctName

    def encrypt_Phone(self, row):
        # Declaring Local Variable
        en_Phone = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Phone = x.getResponse(EncryptMode)
        else:
            en_Phone = ''

        return en_Phone

    def encrypt_Email(self, row):
        # Declaring Local Variable
        en_Email = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Email = x.getResponse(EncryptMode)
        else:
            en_Email = ''

        return en_Email

    # Lookup Methods for Decryption
    def decrypt_acctNbr(self, row):
        # Declaring Local Variable
        de_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctNbr = x.getResponse(EncryptMode)
        else:
            de_AcctNbr = ''

        return de_AcctNbr

    def decrypt_Name(self, row):
        # Declaring Local Variable
        de_AcctName = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctName = x.getResponse(EncryptMode)
        else:
            de_AcctName = ''

        return de_AcctName

    def decrypt_Phone(self, row):
        # Declaring Local Variable
        de_Phone = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Phone = x.getResponse(EncryptMode)
        else:
            de_Phone = ''

        return de_Phone

    def decrypt_Email(self, row):
        # Declaring Local Variable
        de_Email = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Email = x.getResponse(EncryptMode)
        else:
            de_Email = ''

        return de_Email

    def getEncrypt(self, df_payload):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            df_input = df_payload

            # Checking total count of rows
            count_row = df_input.shape[0]
            print('Total number of records to process:: ', count_row)

            # Deriving rows
            df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
            df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
            df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
            df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

            # Dropping original columns
            df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

            # Renaming new columns with the old column names
            df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
            df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
            df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
            df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

            # New Column List Orders
            column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
            df_fin = df_input.reindex(column_order, axis=1)

            return df_fin
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})

            return df_error


    def getDecrypt(self, df_encrypted_payload):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            df_input = df_encrypted_payload

            # Checking total count of rows
            count_row = df_input.shape[0]
            print('Total number of records to process:: ', count_row)


            # Deriving rows
            df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
            df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
            df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
            df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)

            # Dropping original columns
            df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

            # Renaming new columns with the old column names
            df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
            df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
            df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
            df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)

            # New Column List Orders
            column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email']
            df_fin = df_input.reindex(column_order, axis=1)

            return df_fin
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':''})

            return df_error

Key lines to discuss –

Main two methods, we’ll be looking into & they are –

a. getEncrypt

b. getDecrypt

However, these two functions constructions are identical in nature. One is for encryption & the other one is decryption.

# Deriving rows
df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

As you can see, the application is processing row-by-row & column-by-column data transformations using look-up functions.

# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

As the comment suggested, the application is dropping all the unencrypted source columns.

# Renaming new columns with the old column names
df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

Once, the application drops all the source columns, it will rename the new column names back to old columns & based on this data will be merged with the rest of the data from the source csv.

# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
df_fin = df_input.reindex(column_order, axis=1)

Once, the application finished doing all these transformations, it will now re-sequence the order of the columns, which will create the same column order as it’s source csv files.

Similar logic is applicable for the decryption as well.

As we know, there are many look-up methods take part as part of this drive.

encrypt_acctNbr, encrypt_Name, encrypt_Phone, encrypt_Email
decrypt_acctNbr, decrypt_Name, decrypt_Phone, decrypt_Email

We’ll discuss only one method as these are completely identical.

# Capturing essential values
EncryptMode = self.EncryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr = str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()

From the row, our application is extracting the relevant column. In this case, it is Acct_Nbr. And, then converts it to string & remove any unnecessary white space from it.

# Forming JSON String for this field
json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

Once extracted, the application will build the target JON string as per column data.

# Identifying Length of the field
len_acct_nbr = len(fil_acct_nbr)

# This will trigger the service if it has valid data
if len_acct_nbr > 0:
    x = cw.clsWeb(json_source_str)
    en_AcctNbr = x.getResponse(EncryptMode)
else:
    en_AcctNbr = ''

Based on the length of the extracted value, our application will trigger the individual JSON requests & will receive the data frame in response.

9. clsParallel.py (This script will use the queue to make asynchronous calls & perform the same encryption & decryption. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
#### This script will use the advance   ####
#### queue & asynchronus calls to the   ####
#### API Server to process Encryption & ####
#### Decryption on our csv files.       ####
############################################
import pandas as p
import clsWebService as cw
import datetime
from clsParam import clsParam as cf
from multiprocessing import Lock, Process, Queue, freeze_support, JoinableQueue
import gc
import signal
import time
import os
import queue
import asyncio

# Declaring Global Variable
q = Queue()
lock = Lock()

finished_task = JoinableQueue()
pending_task = JoinableQueue()

sp_fin_dict = {}
dp_fin_dict = {}

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsParallel(object):
    def __init__(self):
        self.path = cf.config['PATH']
        self.EncryptMode = str(cf.config['ENCRYPT_MODE'])
        self.DecryptMode = str(cf.config['DECRYPT_MODE'])
        self.num_worker_process = int(cf.config['NUM_OF_THREAD'])
        self.lock = Lock()

    # Lookup Methods for Encryption
    def encrypt_acctNbr(self, row):
        # Declaring Local Variable
        en_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctNbr = x.getResponse(EncryptMode)
        else:
            en_AcctNbr = ''

        fil_acct_nbr = ''

        return en_AcctNbr

    def encrypt_Name(self, row):
        # Declaring Local Variable
        en_AcctName = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctName = x.getResponse(EncryptMode)
        else:
            en_AcctName = ''

        return en_AcctName

    def encrypt_Phone(self, row):
        # Declaring Local Variable
        en_Phone = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Phone = x.getResponse(EncryptMode)
        else:
            en_Phone = ''

        return en_Phone

    def encrypt_Email(self, row):
        # Declaring Local Variable
        en_Email = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Email = x.getResponse(EncryptMode)
        else:
            en_Email = ''

        return en_Email

    # Lookup Methods for Decryption
    def decrypt_acctNbr(self, row):
        # Declaring Local Variable
        de_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctNbr = x.getResponse(EncryptMode)
        else:
            de_AcctNbr = ''

        return de_AcctNbr

    def decrypt_Name(self, row):
        # Declaring Local Variable
        de_AcctName = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctName = x.getResponse(EncryptMode)
        else:
            de_AcctName = ''

        return de_AcctName

    def decrypt_Phone(self, row):
        # Declaring Local Variable
        de_Phone = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Phone = x.getResponse(EncryptMode)
        else:
            de_Phone = ''

        return de_Phone

    def decrypt_Email(self, row):
        # Declaring Local Variable
        de_Email = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Email = x.getResponse(EncryptMode)
        else:
            de_Email = ''

        return de_Email

    def getEncrypt(self, df_dict):
        try:
            en_fin_dict = {}

            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_dict.items():
                Process_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])
            print('Part number of records to process:: ', count_row)

            if count_row > 0:

                # Deriving rows
                df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
                df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
                df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
                df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
                df_fin = df_input.reindex(column_order, axis=1)

                sp_fin_dict[Process_Name] = df_fin

            return sp_fin_dict
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})
            sp_fin_dict[Process_Name] = df_error

            return sp_fin_dict

    async def produceEncr(self, queue, l_dict):

        m_dict = {}

        m_dict = self.getEncrypt(l_dict)

        for k, v in m_dict.items():
            item = k
            print('producing {}...'.format(item))

        await queue.put(m_dict)


    async def consumeEncr(self, queue):
        result_dict = {}

        while True:
            # wait for an item from the producer
            sp_fin_dict.update(await queue.get())

            # process the item
            for k, v in sp_fin_dict.items():
                item = k
                print('consuming {}...'.format(item))

            # Notify the queue that the item has been processed
            queue.task_done()


    async def runEncrypt(self, n, df_input):
        l_dict = {}

        queue = asyncio.Queue()
        # schedule the consumer
        consumer = asyncio.ensure_future(self.consumeEncr(queue))

        start_pos = 0
        end_pos = 0

        num_worker_process = n

        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            name = 'Task-' + str(i)

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            print("start_pos: ", start_pos)
            print("end_pos: ", end_pos)

            split_df = df_input.iloc[start_pos:end_pos]
            l_dict[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            # run the producer and wait for completion
            await self.produceEncr(queue, l_dict)
            # wait until the consumer has processed all items
            await queue.join()

        # the consumer is still awaiting for an item, cancel it
        consumer.cancel()

        return sp_fin_dict


    def getEncryptParallel(self, df_payload):

        l_dict = {}
        data_dict = {}
        min_val_list = {}
        cnt = 1
        num_worker_process = self.num_worker_process
        actual_worker_task = 0
        number_of_processes = 4

        processes = []

        split_df = p.DataFrame()
        df_ret = p.DataFrame()
        dummy_df = p.DataFrame()

        # Assigning Target File Basic Name
        df_input = df_payload

        # Checking total count of rows
        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row/interval) + 1

        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.runEncrypt(actual_worker_task, df_input))
        loop.close()

        for k, v in sp_fin_dict.items():
            min_val_list[int(k.replace('Task-', ''))] = v

        min_val = min(min_val_list, key=int)
        print("Minimum Index Value: ", min_val)

        for k, v in sorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
            if int(k.replace('Task-', '')) == min_val:
                df_ret = sp_fin_dict[k]
            else:
                d_frames = [df_ret, sp_fin_dict[k]]
                df_ret = p.concat(d_frames)

        return df_ret

    def getDecrypt(self, df_encrypted_dict):
        try:
            de_fin_dict = {}

            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_encrypted_dict.items():
                Process_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])
            print('Part number of records to process:: ', count_row)

            if count_row > 0:

                # Deriving rows
                df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
                df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
                df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
                df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
                df_fin = df_input.reindex(column_order, axis=1)

                de_fin_dict[Process_Name] = df_fin

            return de_fin_dict

        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr': str(e), 'Name': '', 'Acct_Addr_1': '', 'Acct_Addr_2': '', 'Phone': '', 'Email': '', 'Serial_No': ''})
            de_fin_dict[Process_Name] = df_error

            return de_fin_dict

    async def produceDecr(self, queue, l_dict):

        m_dict = {}

        m_dict = self.getDecrypt(l_dict)

        for k, v in m_dict.items():
            item = k
            print('producing {}...'.format(item))

        await queue.put(m_dict)


    async def consumeDecr(self, queue):
        result_dict = {}

        while True:
            # wait for an item from the producer
            dp_fin_dict.update(await queue.get())

            # process the item
            for k, v in dp_fin_dict.items():
                item = k
                print('consuming {}...'.format(item))

            # Notify the queue that the item has been processed
            queue.task_done()


    async def runDecrypt(self, n, df_input):
        l_dict = {}

        queue = asyncio.Queue()
        # schedule the consumer
        consumerDe = asyncio.ensure_future(self.consumeDecr(queue))

        start_pos = 0
        end_pos = 0

        num_worker_process = n

        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            name = 'Task-' + str(i)

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            print("start_pos: ", start_pos)
            print("end_pos: ", end_pos)

            split_df = df_input.iloc[start_pos:end_pos]
            l_dict[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            # run the producer and wait for completion
            await self.produceDecr(queue, l_dict)
            # wait until the consumer has processed all items
            await queue.join()

        # the consumer is still awaiting for an item, cancel it
        consumerDe.cancel()

        return dp_fin_dict


    def getDecryptParallel(self, df_payload):

        l_dict = {}
        data_dict = {}
        min_val_list = {}
        cnt = 1
        num_worker_process = self.num_worker_process
        actual_worker_task = 0
        number_of_processes = 4

        processes = []

        split_df = p.DataFrame()
        df_ret_1 = p.DataFrame()
        dummy_df = p.DataFrame()

        # Assigning Target File Basic Name
        df_input = df_payload

        # Checking total count of rows
        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row/interval) + 1

        loop_1 = asyncio.new_event_loop()
        asyncio.set_event_loop(asyncio.new_event_loop())
        loop_2 = asyncio.get_event_loop()
        loop_2.run_until_complete(self.runDecrypt(actual_worker_task, df_input))
        loop_2.close()

        for k, v in dp_fin_dict.items():
            min_val_list[int(k.replace('Task-', ''))] = v

        min_val = min(min_val_list, key=int)
        print("Minimum Index Value: ", min_val)

        for k, v in sorted(dp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
            if int(k.replace('Task-', '')) == min_val:
                df_ret_1 = dp_fin_dict[k]
            else:
                d_frames = [df_ret_1, dp_fin_dict[k]]
                df_ret_1 = p.concat(d_frames)

        return df_ret_1

I don’t want to discuss any more look-up methods as the post is already pretty big. Only address a few critical lines

Under getEncryptParallel, the following lines are essential –

# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)

interval = int(count_row / num_worker_process) + 1
actual_worker_task = int(count_row/interval) + 1

Based on the dataframe total number of records, our application will split that main dataframe into parts of sub dataframe & then pass them using queue by asynchronous queue calls.

loop = asyncio.get_event_loop()
loop.run_until_complete(self.runEncrypt(actual_worker_task, df_input))
loop.close()

Initiating our queue methods & passing our dataframe to it.

for k, v in sorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
    if int(k.replace('Task-', '')) == min_val:
        df_ret = sp_fin_dict[k]
    else:
        d_frames = [df_ret, sp_fin_dict[k]]
        df_ret = p.concat(d_frames)

Our application is sending & receiving data using the dictionary. The reason is – we’re not expecting data that we may get it from our server in sequence. Instead, we’re hoping the data will be random. Hence, using keys, we’re maintaining our final sequence & that will ensure our application to joining back to the correct sets of source data, which won’t be the candidate for any encryption/decryption.

Let’s discuss runEncrypt method.

for i in range(actual_worker_task):
    name = 'Task-' + str(i)

    if ((start_pos + interval) < count_row):
        end_pos = start_pos + interval
    else:
        end_pos = start_pos + (count_row - start_pos)

    print("start_pos: ", start_pos)
    print("end_pos: ", end_pos)

    split_df = df_input.iloc[start_pos:end_pos]
    l_dict[name] = split_df

    if ((start_pos > count_row) | (start_pos == count_row)):
        break
    else:
        start_pos = start_pos + interval

Here, our application is splitting our source data frame into multiple sub dataframe & then it can be processed in parallel using queues.

# run the producer and wait for completion
await self.produceEncr(queue, l_dict)
# wait until the consumer has processed all items
await queue.join()

Invoking the encryption-decryption process using queues. The last line is significant. The queue will not destroy until all the item produced/place into the queue are not consumed. Hence, your main program will wait until it processes all the records of your dataframe.

Two methods named produceEncr & consumeEncr mainly used for placing an item inside the queue & then after encryption/decryption it will retrieve it from the queue.

Few important lines from both the methods are –

#produceEncr
await queue.put(m_dict)

#consumeEncr
# wait for an item from the producer
sp_fin_dict.update(await queue.get())
# Notify the queue that the item has been processed
queue.task_done()

From the first two lines, one can see that the application will place its item into the queue. Rests are the lines from the other methods. Our application is pouring the data into the dictionary, which will be returned to our calling methods. The last line is significantly essential. Without the task_done process, the queue will continue to wait for upcoming items. Hence, that will trigger infinite wait or sometimes deadlock.

10. callClient.py (This script will trigger both the serial & parallel process of encryption one by one & finally capture some statistics. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
############################################
import pandas as p
import clsSerial as cs
import time
import datetime
from clsParam import clsParam as cf
import clsParallel as cp
import sys

def main():
    source_df = p.DataFrame()
    encrypted_df = p.DataFrame()
    source_encrypted_df = p.DataFrame()
    decrypted_df = p.DataFrame()
    encrypted_parallel_df = p.DataFrame()
    source_encrypted_parallel_df = p.DataFrame()
    decrypted_parallel_df = p.DataFrame()

    ###############################################################################
    #####                Start Of Serial Encryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startEnTime = time.time()
    srcFile = 'acct_addr_20180106'
    srcFileWithPath = str(cf.config['SRC_DIR']) + srcFile + '.csv'

    print("Calling Serial Process to Encrypt!")

    # Reading source file
    source_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cs.clsSerial()
    encrypted_df = x.getEncrypt(source_df)

    # Handling Multiple source files
    var = datetime.datetime.now().strftime("%H.%M.%S")
    print('Target File Extension will contain the following:: ', var)

    targetFile = srcFile + '_Serial_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    encrypted_df.to_csv(taregetFileWithPath, index=False)

    endEnTime = time.time()
    z1 = str(endEnTime - startEnTime)
    print("Over All Encrypt Process Time:", z1)

    time.sleep(20)

    ###############################################################################
    #####                Start Of Serial Decryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startDeTime = time.time()
    srcFileWithPath = taregetFileWithPath

    print("Calling Serial Process to Decrypt!")

    # Reading source file
    source_encrypted_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cs.clsSerial()
    decrypted_df = x.getDecrypt(source_encrypted_df)

    targetFile = srcFile + '_restored_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    decrypted_df.to_csv(taregetFileWithPath, index=False)

    endDeTime = time.time()
    z2 = str(endDeTime - startDeTime)
    print("Over All Decrypt Process Time:", z2)

    print("-" * 157)

    ###############################################################################
    #####        End Of Serial Encryption/Decryption Methods                 ######
    ###############################################################################

    time.sleep(20)

    ###############################################################################
    #####                Start Of Parallel Encryption Methods                ######
    ###############################################################################

    print("-" * 157)

    startEnTime = time.time()
    srcFileWithPath = str(cf.config['SRC_DIR']) + srcFile + '.csv'

    print("Calling Serial Process to Encrypt!")

    # Reading source file
    source_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cp.clsParallel()
    encrypted_parallel_df = x.getEncryptParallel(source_df)

    # Handling Multiple source files
    var = datetime.datetime.now().strftime("%H.%M.%S")
    print('Target File Extension will contain the following:: ', var)

    targetFile = srcFile + '_Parallel_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    encrypted_parallel_df.to_csv(taregetFileWithPath, index=False)

    endEnTime = time.time()
    z3 = str(endEnTime - startEnTime)
    print("Over All Encrypt Process Time:", z3)

    time.sleep(20)

    ###############################################################################
    #####                Start Of Serial Decryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startDeTime = time.time()
    srcFileWithPath = taregetFileWithPath

    print("Calling Parallel Process to Decrypt!")

    # Reading source file
    source_encrypted_parallel_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cp.clsParallel()
    decrypted_parallel_df = x.getDecryptParallel(source_encrypted_parallel_df)

    targetFile = srcFile + '_restored_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    decrypted_parallel_df.to_csv(taregetFileWithPath, index=False)

    endDeTime = time.time()
    z4 = str(endDeTime - startDeTime)
    print("Over All Decrypt Process Time:", z4)

    print("-" * 157)

    ###############################################################################
    #####        End Of Parallel Encryption/Decryption Methods               ######
    ###############################################################################

    ###############################################################################
    #####    Final Statistics between Serial & Parallel loading.             ######
    ###############################################################################

    print("-" * 157)
    print("Serial Encryption:: ", z1)
    print("Serial Decryption:: ", z2)
    print("-" * 157)
    print("Parallel Encryption:: ", z3)
    print("Parallel Decryption:: ", z4)
    print("-" * 157)


if __name__ == '__main__':
    main()

As you can see, we’ve triggered both the application using the main callable scripts.

Let’s explore the output –

Windows:

Win_Files

Mac:

MAC_Files

Note that you have to open two different windows or MAC terminal. One will trigger the server & others will trigger the client to simulate this.

Server:

Win_Server

Clients:

Win:

Win_Run

MAC:

MAC_Run

So, finally, we’ve achieved our goal. So, today we’ve done a bit long but beneficial & advanced concepts of crossover stones from our python verse. 🙂

Lot more innovative posts are coming.

Till then – Happy Avenging!

Pandas, Numpy, Encryption/Decryption, Hidden Files In Python (Crossover between Space Stone, Reality Stone & Mind Stone of Python-Verse)

So, here we come up with another crossover of Space Stone, Reality Stone & Mind Stone of Python-Verse. It is indeed exciting & I cannot wait to explore that part further. Today, in this post, we’ll see how one application can integrate all these key ingredients in Python to serve the purpose. Our key focus will be involving popular packages like Pandas, Numpy & Popular Encryption-Decryption techniques, which include some hidden files as well.

So, our objective here is to proceed with the encryption & decryption technique. But, there is a catch. We need to store some salt or tokenized value inside a hidden file. Our application will extract the salt value from it & then based on that it will perform Encrypt/Decrypt on the data.

Why do we need this approach?

The answer is simple. On many occasions, we don’t want to store our right credentials in configuration files. Also, we don’t want to keep our keys to open to other developers. There are many ways you can achieve this kind of security.  Today, I’ll be showing a different approach to make the same.

Let’s explore.

As usual, I’ll provide the solution, which is tested in Windows & MAC & provide the script. Also, I’ll explain the critical lines of those scripts to understand it from a layman point of view. And, I won’t explain any script, which I’ve already explained in my earlier post. So, you have to refer my old post for that.

To encrypt & decrypt, we need the following files, which contains credentials in a csv. Please find the sample data –

Config_orig.csv

Orig_File

Please see the file, which will be hidden by the application process.

Token_Salt_File

As you can see, this column contains the salt, which will be used in our Encryption/Decryption.

1. clsL.py (This script will create the csv files or any intermediate debug csv file after the corresponding process. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
####                               ########
#### Objective: Log File           ########
###########################################
import pandas as p
import platform as pl
from clsParam import clsParam as cf

class clsL(object):
    def __init__(self):
        self.path = cf.config['PATH']

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df
            sd = subdir

            os_det = pl.system()

            if sd == None:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + Filename
                else:
                    fullFileName = self.path + '/' + Filename
            else:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + sd + "\\" + Filename
                else:
                    fullFileName = self.path + '/' + sd + "/" + Filename

            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

2. clsParam.py (This is the script that will be used as a parameter file & will be used in other python scripts.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
#### Objective: Parameter File     ########
###########################################

import os
import platform as pl

class clsParam(object):

    config = {
        'FILENAME' : 'test.amca',
        'OSX_MOD_FILE_NM': '.test.amca',
        'CURR_PATH': os.path.dirname(os.path.realpath(__file__)),
        'NORMAL_FLAG': 32,
        'HIDDEN_FLAG': 34,
        'OS_DET': pl.system()
    }

 

3. clsWinHide.py (This script contains the core logic of hiding/unhiding a file under Windows OS. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
###########################################
#### Written By: SATYAKI DE          ######
#### Written On: 25-Jan-2019         ######
####                                 ######
#### This script will hide or Unhide ######
#### Files in Windows.               ######
###########################################

import win32file
import win32con
from clsParam import clsParam as cp

class clsWinHide(object):
    def __init__(self):
        self.path = cp.config['CURR_PATH']
        self.FileName = cp.config['FILENAME']
        self.normal_file_flag = cp.config['NORMAL_FLAG']

    def doit(self):
        try:
            path = self.path
            FileName = self.FileName

            FileNameWithPath = path + '\\' + FileName
            flags = win32file.GetFileAttributesW(FileNameWithPath)
            win32file.SetFileAttributes(FileNameWithPath,win32con.FILE_ATTRIBUTE_HIDDEN | flags)

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

    def undoit(self):
        try:
            path = self.path
            FileName = self.FileName
            normal_file_flag = self.normal_file_flag

            FileNameWithPath = path + '\\' + FileName
            win32file.SetFileAttributes(FileNameWithPath,win32con.FILE_ATTRIBUTE_NORMAL | int(normal_file_flag))

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

Key lines that we would like to explore are as follows –

def doit()

flags = win32file.GetFileAttributesW(FileNameWithPath)
win32file.SetFileAttributes(FileNameWithPath,win32con.FILE_ATTRIBUTE_HIDDEN | flags)

The above two lines under doit() functions are changing the file attributes in Windows OS to the hidden mode by assigning the FILE_ATTRIBUTE_HIDDEN property.

def undoit()

normal_file_flag = self.normal_file_flag

FileNameWithPath = path + '\\' + FileName
win32file.SetFileAttributes(FileNameWithPath,win32con.FILE_ATTRIBUTE_NORMAL | int(normal_file_flag))

As the script suggested, the application is setting the file attribute of a hidden file to FILE_ATTRIBUTE_NORMAL & set the correct flag from parameters, which leads to the file appears as a normal windows file.

4. clsOSXHide.py (This script contains the core logic of hiding/unhiding a file under OSX, i.e., MAC OS. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
###########################################
#### Written By: SATYAKI DE           #####
#### Written On: 25-Jan-2019          #####
####                                  #####
#### Objective: This script will hide #####
#### or Unhide the file in OSX.       #####
###########################################

import os
from clsParam import clsParam as cp

class clsOSXHide(object):
    def __init__(self):
        self.path = cp.config['CURR_PATH']
        self.FileName = cp.config['FILENAME']
        self.OSX_Mod_FileName = cp.config['OSX_MOD_FILE_NM']
        self.normal_file_flag = cp.config['NORMAL_FLAG']

    def doit(self):
        try:
            path = self.path
            FileName = self.FileName

            FileNameWithPath = path + '/' + FileName
            os.rename(FileNameWithPath, os.path.join(os.path.dirname(FileNameWithPath),'.'
                                                     + os.path.basename(FileNameWithPath)))

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

    def undoit(self):
        try:
            path = self.path
            FileName = self.FileName
            OSX_Mod_FileName = self.OSX_Mod_FileName

            FileNameWithPath = path + '/' + FileName
            os.rename(OSX_Mod_FileName, FileNameWithPath)

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

The key lines that we’ll be exploring here are as follows –

def doit()

FileNameWithPath = path + '/' + FileName
os.rename(FileNameWithPath, os.path.join(os.path.dirname(FileNameWithPath),'.'
                                         + os.path.basename(FileNameWithPath)))

In MAC or Linux, any file starts with ‘.’ will be considered as a hidden file. Hence, we’re changing the file type by doing this manipulation.

def undoit()

OSX_Mod_FileName = self.OSX_Mod_FileName

FileNameWithPath = path + '/' + FileName
os.rename(OSX_Mod_FileName, FileNameWithPath)

In this case, our application simply renaming a file with its the original file to get the file as a normal file.

Let’s understand that in Linux or MAC, you have a lot of other ways to restrict any files as it has much more granular level access control.  But, I thought, why not take a slightly different & fun way to achieve the same. After all, we’re building an Infinity War for Python verse. A little bit of fun will certainly make some sense. 🙂

5. clsProcess.py (This script will invoke any of the hide scripts, i.e. clsWinHide.py or clsOSXHide.py based on the OS platform. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
###########################################
#### Written By: SATYAKI DE          ######
#### Written On: 25-Jan-2019         ######
####                                 ######
#### Objective: Based on the OS, this######
#### script calls the actual script. ######
###########################################

from clsParam import clsParam as cp

plat_det = cp.config['OS_DET']

# Based on the platform
# Application is loading subprocess
# in order to avoid library missing
# case against cross platform

if plat_det == "Windows":
    import clsWinHide as win
else:
    import clsOSXHide as osx

# End of conditional class load

class clsProcess(object):
    def __init__(self):
        self.os_det = plat_det

    def doit(self):
        try:

            os_det = self.os_det
            print("OS Info: ", os_det)

            if os_det == "Windows":
                win_doit = win.clsWinHide()
                ret_val = win_doit.doit()
            else:
                osx_doit = osx.clsOSXHide()
                ret_val = osx_doit.doit()

            return ret_val
        except Exception as e:
            x = str(e)
            print(x)

            return 1

    def undoit(self):
        try:

            os_det = self.os_det
            print("OS Info: ", os_det)

            if os_det == "Windows":
                win_doit = win.clsWinHide()
                ret_val = win_doit.undoit()
            else:
                osx_doit = osx.clsOSXHide()
                ret_val = osx_doit.undoit()

            return ret_val
        except Exception as e:
            x = str(e)
            print(x)

            return 1

Key lines to explores are as follows –

from clsParam import clsParam as cp

plat_det = cp.config['OS_DET']

# Based on the platform
# Application is loading subprocess
# in order to avoid library missing
# case against cross platform

if plat_det == "Windows":
    import clsWinHide as win
else:
    import clsOSXHide as osx

This step is very essential to run the same python scripts in both the environments, e.g. in this case like MAC & Windows.

So, based on the platform details, which the application is getting from the clsParam class, it is loading the specific class to the application. And why it is so important.

Under Windows OS, this will work if you load both the class. But, under MAC, this will fail as the first program will try to load all the libraries & it may happen that the pywin32/pypiwin32 package might not available under MAC. Anyway, you are not even using that package. So, this conditional class loading is significant.

os_det = self.os_det
print("OS Info: ", os_det)

if os_det == "Windows":
    win_doit = win.clsWinHide()
    ret_val = win_doit.doit()
else:
    osx_doit = osx.clsOSXHide()
    ret_val = osx_doit.doit()

As you can see that, based on the OS, it is invoking the correct function of that corresponding class.

6. clsEnDec.py (This script will read the credentials from a csv file & then based on the salt captured from the hidden file, it will either encrypt or decrypt the content. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
#### Package Cryptography needs to ########
#### install in order to run this  ########
#### script.                       ########
####                               ########
#### Objective: This script will   ########
#### encrypt/decrypt based on the  ########
#### hidden supplied salt value.   ########
###########################################

import pandas as p
from cryptography.fernet import Fernet

class clsEnDec(object):

    def __init__(self, token):
        # Calculating Key
        self.token = token

    def encrypt_str(self):
        try:
            # Capturing the Salt Information
            salt = self.token
            # Fetching the content of lookup file
            df_orig = p.read_csv('Config_orig.csv', index_col=False)

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)

            df_orig['User'] = df_orig['User'].apply(lambda x1: cipher.encrypt(bytes(x1,'utf8')))
            df_orig['Pwd'] = df_orig['Pwd'].apply(lambda x2: cipher.encrypt(bytes(x2,'utf8')))

            # Writing to the File
            df_orig.to_csv('Encrypt_Config.csv', index=False)

            return 0
        except Exception as e:
            x = str(e)
            print(x)
            return 1

    def decrypt_str(self):
        try:
            # Capturing the Salt Information
            salt = self.token
            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)

            # Fetching the Encrypted csv file
            df_orig = p.read_csv('Encrypt_Config.csv', index_col=False)

            df_orig['User'] = df_orig['User'].apply(lambda x1: str(cipher.decrypt(bytes(x1[2:-1],'utf8'))).replace("b'","").replace("'",""))
            df_orig['Pwd'] = df_orig['Pwd'].apply(lambda x2: str(cipher.decrypt(bytes(x2[2:-1],'utf8'))).replace("b'","").replace("'",""))

            # Writing to the file
            df_orig.to_csv('Decrypt_Config.csv', index=False)

            return 0
        except Exception as e:
            x = str(e)
            print(x)
            return 1

Key lines from this script are as follows –

def encrypt_str()

# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)

df_orig['User'] = df_orig['User'].apply(lambda x1: cipher.encrypt(bytes(x1,'utf8')))
df_orig['Pwd'] = df_orig['Pwd'].apply(lambda x2: cipher.encrypt(bytes(x2,'utf8')))

So, once you captured the salt from that hidden file, the application is capturing that value over here. And, based on that both the field will be encrypted. But, note that cryptography package is required for this. And, you need to pass bytes value to work this thing. Hence, we’ve used bytes() function over here.

def decrypt_str()

cipher = Fernet(salt)

# Fetching the Encrypted csv file
df_orig = p.read_csv('Encrypt_Config.csv', index_col=False)

df_orig['User'] = df_orig['User'].apply(lambda x1: str(cipher.decrypt(bytes(x1[2:-1],'utf8'))).replace("b'","").replace("'",""))
df_orig['Pwd'] = df_orig['Pwd'].apply(lambda x2: str(cipher.decrypt(bytes(x2[2:-1],'utf8'))).replace("b'","").replace("'",""))

Again, in this step, our application is extracting the salt & then it retrieves the encrypted values of corresponding fields & applies the decryption logic on top of it. Note that, since we need to pass bytes value to get it to work. Hence, your output will be appended with (b’xxxxx’). To strip that, we’ve used the replace() functions. You can use regular expression using pattern matching as well.

7. callEnDec.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
###########################################
#### Written By: SATYAKI DE           #####
#### Written On: 25-Jan-2019          #####
####                                  #####
#### Objective: Main calling function #####
###########################################

import clsEnDec as ed
import clsProcess as h
from clsParam import clsParam as cp
import time as t
import pandas as p

def main():
    print("")
    print("#" * 60)
    print("Calling (Encryption/Decryption) Package!!")
    print("#" * 60)
    print("")

    # Unhiding the file
    x = h.clsProcess()
    ret_val_unhide = x.undoit()

    if ret_val_unhide == 0:
        print("Successfully Unhide the file!")
    else:
        print("Unsuccessful to Unhide the file!")

    # To See the Unhide file
    t.sleep(10)

    print("*" * 60)
    print("Proceeding with Encryption...")
    print("*" * 60)

    # Getting Salt Value from the hidden files
    # by temporarily making it available
    FileName = cp.config['FILENAME']
    df = p.read_csv(FileName, index_col=False)
    salt = str(df.iloc[0]['Token_Salt'])
    print("-" * 60)
    print("Salt: ", salt)
    print("-" * 60)

    # Calling the Encryption Method
    x = ed.clsEnDec(salt)
    ret_val = x.encrypt_str()

    if ret_val == 0:
        print("Encryption Successful!")
    else:
        print("Encryption Failure!")

    print("")
    print("*" * 60)
    print("Checking Decryption Now...")
    print("*" * 60)

    # Calling the Decryption Method
    ret_val1 = x.decrypt_str()

    if ret_val1 == 0:
        print("Decryption Successful!")
    else:
        print("Decryption Failure!")

    # Hiding the salt file
    x = h.clsProcess()
    ret_val_hide = x.doit()

    if ret_val_hide == 0:
        print("Successfully Hide the file!")
    else:
        print("Unsuccessful to Hide the file!")

    print("*" * 60)
    print("Operation Done!")
    print("*" * 60)

if __name__ == '__main__':
    main()

And, here comes the final calling methods.

The key lines that we would like to discuss –

# Getting Salt Value from the hidden files
# by temporarily making it available
FileName = cp.config['FILENAME']
df = p.read_csv(FileName, index_col=False)
salt = str(df.iloc[0]['Token_Salt'])

As I’ve shown that, we have our hidden files that contain only 1 row & 1 column. To extract the specific value we’ve used iloc with the row number as 0 along with the column name, i.e. Token_Salt.

Now, let’s see how it runs –

Windows (64 bit):

Win_Run

Mac (32 bit):

MAC_Run

So, from the screenshot, we can see our desired output & you can calculate the aggregated value based on our sample provided in the previous screenshot.

Let’s check the Encrypted & Decrypted values –

Encrypted Values (Encrypt_Config.csv):

Encrypted_File

Decrypted Values (Decrypt_Config.csv):

Decrypted_File

So, finally, we’ve achieved our target.

I hope this will give you some more idea about more insights into the Python verse. Let me know – how do you think about this post.

Till then – Happy Avenging!

Pandas, Numpy, JSON & SSL (Crossover of Space Stone & Reality Stone in Python Verse)

In our last installment, we’ve shown pandas & numpy based on a specific situation. If that is our Space Stone installment of Python Verse, then this would be one approach of creating much interesting crossover of Space Stone & Reality Stone of Python verse. Yes. You are right. We’ll be discussing one requirement, where we need many of these in a single task.

Let’s dive into it!

Let’s assume that we have a source csv file which has the following data –

src_csv

Now, the requirement is – we need to use one third party web service to send JSON payload preparing with this data & send them to the 3rd party API to get the City, State & based on that we need to find the total number of item sold against each State & City.

requirement_data

Let’s look into our third-party API site  –

Please find the third-party API Link

website_api

As per the agreement with this website, any developer can test 10 calls per day free. After that, it will send your response with encrypted values, e.g. Classified. But, we don’t need more than 10 calls to test it.

Here, we’ll be dealing with the 4 python scripts. Among them, one scenario I’ve already described in my previous post. So, I’ll be just mentioning the file & post the script.

Please find the directory structure in both the OS –

directory_win_mac

1. clsLpy (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################
import pandas as p
import os
import platform as pl
from clsParam import clsParam as cf

class clsL(object):
    def __init__(self):
        self.path = cf.config['PATH']

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df
            sd = subdir

            os_det = pl.system()

            if sd == None:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + Filename
                else:
                    fullFileName = self.path + '/' + Filename
            else:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + sd + "\\" + Filename
                else:
                    fullFileName = self.path + '/' + sd + "/" + Filename

            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

 

2. clsParam.py (This script contains the parameter entries in the form of dictionary & later this can be used in all the relevant python scripts as configuration parameters.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import os

class clsParam(object):

    config = {
        'MAX_RETRY' : 5,
        'API_KEY' : 'HtWTVS86n8xoGXahyg1tPYH0HwngPqH2YFICzRCtlLbCtfNdya8L1UwRvH90PeMF',
        'PATH' : os.path.dirname(os.path.realpath(__file__)),
        'SUBDIR' : 'data'
    }

As you can see from this script that we’ve declared all the necessary parameters here as dictionary object & later we’ll be referring these parameters in the corresponding python scripts.

'API_KEY' : 'HtWTVS86n8xoGXahyg1tPYH0HwngPqH2YFICzRCtlLbCtfNdya8L1UwRvH90PeMF'

One crucial line, we’ll look into. API_KEY will be used while sending the JSON payload to the third-party web service. We’ll get this API_KEY from the highlighted (In Yellow) picture posted above.

3. clsWeb.py (This is the main script, which will first convert the pandas’ data frames into JSON & and send the API request as per the third party site.  It will capture the response & convert that by normalizing the data & poured it back to the data frame for further process.)

 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import json
import requests
import datetime
import time
import ssl
from urllib.request import urlopen
import pandas as p
import numpy as np
import os
import gc
from clsParam import clsParam as cp

class clsWeb(object):
    def __init__(self, payload, format, unit):
        self.payload = payload
        self.path = cp.config['PATH']
        # To disable logging info
        self.max_retries = cp.config['MAX_RETRY']
        self.api_key = cp.config['API_KEY']
        self.unit = unit
        self.format =format

    def get_response(self):
        # Assigning Logging Info
        max_retries = self.max_retries
        api_key = self.api_key
        unit = self.unit
        format = self.format
        df_conv = p.DataFrame()
        cnt = 0

        try:
            # Bypassing SSL Authentication
            try:
                _create_unverified_https_context = ssl._create_unverified_context
            except AttributeError:
                # Legacy python that doesn't verify HTTPS certificates by default
                pass
            else:
                # Handle target environment that doesn't support HTTPS verification
                ssl._create_default_https_context = _create_unverified_https_context

            # Capturing the payload
            data_df = self.payload
            temp_df = data_df[['zipcode']]

            list_of_rec = temp_df['zipcode'].values.tolist()

            print(list_of_rec)

            for i in list_of_rec:
                zip = i

                # Providing the url
                url_part = 'http://www.zipcodeapi.com/rest/'
                url = url_part + api_key + '/' + 'info.' + format + '/' + str(zip) + '/' + unit

                headers = {"Content-type": "application/json"}
                param = headers

                var1 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch Start Time:', var1)

                retries = 1
                success = False

                while not success:
                    # Getting response from web service
                    response = requests.get(url, params=param, verify=False)
                    # print("Complete Error:: ", str(response.status_code))
                    # print("Error First::", str(response.status_code)[:1])

                    if str(response.status_code)[:1] == '2':
                        # response = s.post(url, params=param, json=json_data, verify=False)
                        success=True
                    else:
                        wait = retries * 2
                        print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
                        time.sleep(wait)
                        retries += 1

                    # Checking Maximum Retries
                    if retries == max_retries:
                        success=True
                        raise ValueError

                # print(response.text)

                var2 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch End Time:', var2)

                print("-" * 90)

                # Capturing the response json from Web Service
                df_response_json = response.text
                string_to_json = json.loads(df_response_json)

                # Converting the response json to Dataframe
                # df_Int_Rec = p.read_json(string_to_json, orient='records')
                df_Int_Rec = p.io.json.json_normalize(string_to_json)
                df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])

                if cnt == 0:
                    df_conv = df_Int_Rec
                else:
                    d_frames = [df_conv, df_Int_Rec]
                    df_conv = p.concat(d_frames)

                cnt += 1

            # Deleting temporary dataframes & Releasing memories
            del [[df_Int_Rec]]
            gc.collect()

            # Resetting the Index Value
            df_conv.reset_index(drop=True, inplace=True)

            # Merging two data side ways maintaining the orders
            df_add = p.concat([data_df, df_conv], axis=1)

            del [[df_conv]]
            gc.collect()

            # Dropping unwanted column
            df_add.drop(['acceptable_city_names'], axis=1, inplace=True)

            return df_add

        except ValueError as v:
            print(response.text)
            x = str(v)
            print(x)

            # Return Empty Dataframe
            df = p.DataFrame()
            return df

        except Exception as e:
            print(response.text)
            x = str(e)
            print(x)

            # Return Empty Dataframe
            df = p.DataFrame()
            return df

Let’s look at the key lines to discuss –

def __init__(self, payload, format, unit):

    self.payload = payload
    self.path = cp.config['PATH']

    # To disable logging info
    self.max_retries = cp.config['MAX_RETRY']
    self.api_key = cp.config['API_KEY']
    self.unit = unit
    self.format = format

The first block will be instantiated as soon as you are invoking the class. Note that, we’ve used our parameter class python script here as cp & then we’re referring the corresponding elements as & when requires. Other parameters will be captured from the invoking script, which we’ll be discussed later in this post.

# Bypassing SSL Authentication
try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    # Legacy python that doesn't verify HTTPS certificates by default
    pass
else:
    # Handle target environment that doesn't support HTTPS verification
    ssl._create_default_https_context = _create_unverified_https_context

Sometimes, Your Firewall or Proxy might block your web service request due to a specific certificate error. This snippet will bypass that authentication. However, it is always advised to use proper SSL certification in the Production environment.

# Capturing the payload
data_df = self.payload
temp_df = data_df[['zipcode']]

list_of_rec = temp_df['zipcode'].values.tolist()

In this snippet, we’re capturing the zip code from our source data frame & converting them into a list & this would be our candidate to pass the data as part of our JSON payload.

for i in list_of_rec:
    zip = i

    # Providing the url
    url_part = 'http://www.zipcodeapi.com/rest/'
    url = url_part + api_key + '/' + 'info.' + format + '/' + str(zip) + '/' + unit

    headers = {"Content-type": "application/json"}
    param = headers

Once, we’ve extracted our zip codes, we’re passing it one-by-one & forming our JSON with header & data.

retries = 1
success = False

while not success:
    # Getting response from web service
    response = requests.get(url, params=param, verify=False)
    # print("Complete Error:: ", str(response.status_code))
    # print("Error First::", str(response.status_code)[:1])

    if str(response.status_code)[:1] == '2':
        # response = s.post(url, params=param, json=json_data, verify=False)
        success=True
    else:
        wait = retries * 2
        print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
        time.sleep(wait)
        retries += 1

    # Checking Maximum Retries
    if retries == max_retries:
        success=True
        raise ValueError

In this section, we’re posting our JSON application & waiting for the response from the third-party API. If we receive the success response (200), we will proceed with the next zip code. However, if we didn’t receive the success response, we’ll retry the post option again until or unless it reaches the maximum limits. In case, if the application still waiting for a valid answer even after the maximum limit, it will exit from the loop & raise an error to the main application.

# Capturing the response json from Web Service
df_response_json = response.text
string_to_json = json.loads(df_response_json)

# Converting the response json to Dataframe
# df_Int_Rec = p.read_json(string_to_json, orient='records')
df_Int_Rec = p.io.json.json_normalize(string_to_json)
df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])

This snippet will extract the desired response from the API & convert that back to the Pandas data frame. Last two lines, it is normalizing the data that it has received from the API for further process. This is critical steps as these steps will lead to extract City & State from our API response.

# Merging two data side ways maintaining the orders
df_add = p.concat([data_df, df_conv], axis=1)

Once, we’ll have structured data – we can merge it back to our source data frame for our next step.

4. callWebservice.py (This script will call the API script & also process the data to create an aggregate report for our task.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#####################################################
### Objective: Purpose of this Library is to call ###
### the Web Service method to capture the city,   ###
### & the state as a json response & update them  ###
### in the dataframe & finally produce the summary###
### of Total Sales & Item Counts based on the City###
### & the State.                                  ###
###                                               ###
### Arguments are as follows:                     ###
### Mentioned the Exception Part. First time dry  ###
### run the program without providing any args.   ###
### It will show all the mandatory params.        ###
###                                               ###
#####################################################
#####################################################
#### Written By: SATYAKI DE                       ###
#### Written On: 20-Jan-2019                      ###
#####################################################

import clsWeb as cw
import sys
import pandas as p
import os
import platform as pl
import clsLog as log
import datetime
import numpy as np
from clsParam import clsParam as cp

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

def main():
    print("Calling the custom Package..")

    try:
        if len(sys.argv) == 4:
            inputFile = str(sys.argv[1])
            format = str(sys.argv[2])
            unit = str(sys.argv[3])
        else:
            raise Exception

        # Checking whether the format contains
        # allowable choices or not
        if (format == 'JSON'):
            format = 'json'
        elif (format == 'CSV'):
            format = 'csv'
        elif (format == 'XML'):
            format = 'xml'
        else:
            raise Exception

        # Checking whether the format contains
        # allowable choices or not
        if (unit == 'DEGREE'):
            unit = 'degree'
        elif (unit == 'RADIANS'):
            unit = 'radians'
        else:
            raise Exception

        print("*" * 170)
        print("Reading from " + str(inputFile))
        print("*" * 170)

        # Initiating Logging Instances
        clog = log.clsLog()

        path = cp.config['PATH']
        subdir = cp.config['SUBDIR']

        os_det = pl.system()

        if os_det == "Windows":
            src_path = path + '\\' + 'data\\'
        else:
            src_path = path + '/' + 'data/'

        # Reading source data csv file
        df_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)

        x = cw.clsWeb(df_Payload, format, unit)
        retDf = x.get_response()

        # Total Number of rows fetched
        count_row = retDf.shape[0]

        if count_row == 0:
            print("Data Processing Issue!")
        else:
            print("Writing to file -> (" + str(inputFile) + "_modified.csv) Status: Success")

        FileName, FileExtn = inputFile.split(".")

        # Writing to the file
        clog.logr(FileName + '_modified.' + FileExtn, 'Y', retDf, subdir)
        print("*" * 170)

        # Performing group by operation to get the desired result
        # State & City-wise total Sales & Item Sales
        df_src = p.DataFrame()
        df_src = retDf[['city', 'state', 'total', 'item_count']]

        # Converting values to Integer
        df_src['city_1'] = retDf['city'].astype(str)
        df_src['state_1'] = retDf['state'].astype(str)
        df_src['total_1'] = retDf['total'].astype(int)
        df_src['item_count_1'] = retDf['item_count'].astype(int)

        # Dropping the old Dtype Columns
        df_src.drop(['city'], axis=1, inplace=True)
        df_src.drop(['state'], axis=1, inplace=True)
        df_src.drop(['total'], axis=1, inplace=True)
        df_src.drop(['item_count'], axis=1, inplace=True)

        # Renaming the new columns to as per Old Column Name
        df_src.rename(columns={'city_1': 'city'}, inplace=True)
        df_src.rename(columns={'state_1': 'state'}, inplace=True)
        df_src.rename(columns={'total_1': 'total'}, inplace=True)
        df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)

        # Performing Group By Operation
        grouped = df_src.groupby(['state', 'city'])
        res_1 = grouped.aggregate(np.sum)

        print("DF:")
        print(res_1)

        FileName1 = 'StateCityWiseReport'
        # Handling Multiple source files
        var = datetime.datetime.now().strftime(".%H.%M.%S")
        print('Target File Extension will contain the following:: ', var)

        # Writing to the file
        clog.logr(FileName1 + var + '.' + FileExtn, 'Y', df_src, subdir)

        print("*" * 170)
        print("Operation done for " + str(inputFile) + "!")
        print("*" * 170)
    except Exception as e:
        x = str(e)
        print(x)
        print("*" * 170)
        print('Current order would be - <' + str(sys.argv[0]) + '> <Csv File Name> <JSON/CSV/XML> <DEGREE/RADIANS>')
        print('Make sure last two params should be in CAPS only!')
        print("*" * 170)

if __name__ == "__main__":
    main()

Let’s look at some vital code snippet in this main script –

# Reading source data csv file
df_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)

x = cw.clsWeb(df_Payload, format, unit)
retDf = x.get_response()

In this snippet, we’re getting our data from our source csv & then calling our leading Web API service to get the State & City information.

# Converting values to Integer
df_src['city_1'] = retDf['city'].astype(str)
df_src['state_1'] = retDf['state'].astype(str)
df_src['total_1'] = retDf['total'].astype(int)
df_src['item_count_1'] = retDf['item_count'].astype(int)

Converting individual data type to appropriate data types. In Pandas, it is always advisable to change the data type of frames to avoid unforeseen scenarios.

# Dropping the old Dtype Columns
df_src.drop(['city'], axis=1, inplace=True)
df_src.drop(['state'], axis=1, inplace=True)
df_src.drop(['total'], axis=1, inplace=True)
df_src.drop(['item_count'], axis=1, inplace=True)

# Renaming the new columns to as per Old Column Name
df_src.rename(columns={'city_1': 'city'}, inplace=True)
df_src.rename(columns={'state_1': 'state'}, inplace=True)
df_src.rename(columns={'total_1': 'total'}, inplace=True)
df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)

Now, dropping the old columns & renaming the new columns to get the same column with correct data types. I personally like this way as it is an immaculate way to do this task. You can also debug it easily.

# Performing Group By Operation
grouped = df_src.groupby(['state', 'city'])
res_1 = grouped.aggregate(np.sum)

And, finally, using Pandas group-by method we’re aggregating the groups & then using numpy to generate the same against each group.

Please check the first consolidated output –

consolidate_data_from_web_api

From this screenshot, you can see how we have the desired intermediate data of City & State to proceed for the next level.

Let’s see how it runs –

Windows (64 bit):

run_windows

Mac (32 bit):

run_mac

So, from the screenshot, we can see our desired output & you can calculate the aggregated value based on our sample provided in the previous screenshot.

Let’s check how the data directory looks like after run –

Windows:

final_data_windows

MAC:

final_data_mac

So, finally, we’ve achieved our target.

I hope this will give you some more idea about more insights into the Python verse. Let me know – how do you think about this post.

Till then – Happy Avenging!

Pandas & Numpy (Space Stone of Programming World)

Today, we’ll demonstrate the different application of Pandas. In this case, we’ll be exploring the possibilities of reading large CSV files & splitting it sets of smaller more manageable csv to read.

And, after creating it, another process will merge them together. This is especially very useful when you need transformation on a large volume of data without going for any kind of memory error. And, moreover, the developer has more control over failed cases & can resume the load without restarting it from the beginning of the files.

In this case, I’ll be using one more custom methods to create the csv file instead of directly using the to_csv method of pandas.

But, before that let’s prepare the virtual environment & proceed from there –

Windows 10 (64 bit): 

Commands:

python -m venv –copies .env

.env\Scripts\activate.bat

Screenshot:

windows_screen1

Mac OS (64 bit): 

Commands:

python -m venv env

source env/bin/activate

Screenshot:

mac_screen

So, both the Windows & Mac version is 3.7 & we’re going to explore our task in the given section.

After creating this virtual environment, you need to install only pandas package for this task as shown below for both the Windows or Mac OS –

Windows:

package_install_windows

Mac:

package_install_mac

Rests are the packages comes as default with the Python 3.7.

Please find the GUI screenshots from WinSCP software comparing both the directory structures (Mac & Windows) as given below –

winscp_screen

From the above screenshot, you can see that our directory structure are not exactly identical before the blog directory. However, our program will take care of this difference.

Let’s check the scripts one-by-one,

1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#############################################
#### Written By: Satyaki De              ####
#############################################
import pandas as p
import os
import platform as pl

class clsL(object):
    def __init__(self):
        self.path = os.path.dirname(os.path.realpath(__file__))

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df

            sd = subdir
            os_det = pl.system()

            if os_det == "Windows":
                if sd == None:
                    fullFileName = self.path + "\\" + Filename
                else:
                    fullFileName = self.path + "\\" + sd + "\\" + Filename
            else:
                if sd == None:
                    fullFileName = self.path + "/" + Filename
                else:
                    fullFileName = self.path + "/" + sd + "/" + Filename


            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

From the above script, you can see that based on the Indicator, whose value can be either ‘Y’ or ‘N’. It will generate the csv file from the pandas data frame using to_csv method available in pandas.

Key snippet to notice –

self.path = os.path.dirname(os.path.realpath(__file__))

Here, the class is creating an instance & during that time it is initializing the value of the current path from where the application is triggering.

x = p.DataFrame()
x = df

The first line, declaring a pandas data frame variable. The second line assigns the value from the supplied method to that variable.

os_det = pl.system()

This will identify the operating system on which your application is running. Based on that, your path will be dynamically configured & passed. Hence, your application will be ready to handle multiple operating systems since beginning.

x.to_csv(fullFileName, index=False)

Finally, to_csv will generate the final csv file based on the supplied Indicator value. Also, notice that we’ve added one more parameter (index=False). By default, pandas create one extra column known as an index & maintain it’s operation based on that.

index_val

As you can see that the first column is not coming from our source files. Rather, it is generated by the pandas package in python. Hence, we don’t want to capture that in our final file by mentioning (index=False) options.

2. clsSplitFl.py (This script will create the split csv files. This will bring chunk by chunk data into your memory & process the large files.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#############################################
#### Written By: Satyaki De              ####
#############################################
import os
import pandas as p
import clsLog as log
import gc
import csv

class clsSplitFl(object):
    def __init__(self, srcFileName, path, subdir):
        self.srcFileName = srcFileName
        self.path = path
        self.subdir = subdir

        # Maximum Number of rows in CSV
        # in order to avoid Memory Error
        self.max_num_rows = 30000
        self.networked_directory = 'src_file'
        self.Ind = 'Y'

    def split_files(self):
        try:
            src_dir = self.path
            subdir = self.subdir
            networked_directory = self.networked_directory

            # Initiate Logging Instances
            clog = log.clsLog()

            # Setting up values
            srcFileName = self.srcFileName

            First_part, Last_part = str(srcFileName).split(".")

            num_rows = self.max_num_rows
            dest_path = self.path
            remote_src_path = src_dir + networked_directory
            Ind = self.Ind
            interval = num_rows

            # Changing work directory location to source file
            # directory at remote server
            os.chdir(remote_src_path)

            src_fil_itr_no = 1

            # Split logic here
            for df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):
                # Changing the target directory path
                os.chdir(dest_path)

                # Calling custom file generation method
                # to generate splitted files
                clog.logr(str(src_fil_itr_no) + '__' + First_part + '_' + '_splitted_.' + Last_part, Ind, df2, subdir)

                del [[df2]]
                gc.collect()

                src_fil_itr_no += 1

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

In this script, we’re splitting the file if that file has more than 30,000 records. And, based on that it will split a number of equal or fewer volume files.

Important lines to be noticed –

self.max_num_rows = 30000

As already explained, based on this the split files contain the maximum number of rows in each file.

First_part, Last_part = str(srcFileName).split(“.”)

This will split the source file name into the first part & second part i.e. one part contains only the file name & the other part contains only the extension dynamically.

for df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):

As you can see, the chunk-by-chunk (mentioned as chunksize=interval) application will read lines from the large source csv. And, if it has any bad rows in the source files – it will skip them due to the following condition -> (error_bad_lines=False).

clog.logr(str(src_fil_itr_no) + ‘__’ + First_part + ‘_’ + ‘_splitted_.’ + Last_part, Ind, df2, subdir)

Dynamically generating split files in the specific subdirectory along with the modified name. So, these files won’t get overwritten – if you rerun it. Remember that the src_fil_itr_no will play an important role while merging them back to one as this is a number representing the current file’s split number.

del [[df2]]
gc.collect()

Once, you process that part – delete the data frame & deallocate the memory. So, that you won’t encounter any memory error or a similar issue.

And, the split file will look like this –

split_file_in_windows

3. clsMergeFl.py (This script will add together all the split csv files into one big csv file. This will bring chunk by chunk data into your memory & generates the large file.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#############################################
#### Written By: Satyaki De              ####
#############################################
import os
import platform as pl
import pandas as p
import gc
import clsLog as log
import re

class clsMergeFl(object):

    def __init__(self, srcFilename):
        self.srcFilename = srcFilename
        self.subdir = 'finished'
        self.Ind = 'Y'

    def merge_file(self):
        try:
            # Initiating Logging Instances
            clog = log.clsLog()
            df_W = p.DataFrame()
            df_M = p.DataFrame()
            f = {}

            subdir = self.subdir
            srcFilename = self.srcFilename
            Ind = self.Ind
            cnt = 0

            os_det = pl.system()

            if os_det == "Windows":
                proc_dir = "\\temp\\"
                gen_dir = "\\process\\"
            else:
                proc_dir = "/temp/"
                gen_dir = "/process/"

            # Current Directory where application presents
            path = os.path.dirname(os.path.realpath(__file__)) + proc_dir

            print("Path: ", path)
            print("Source File Initial Name: ", srcFilename)

            for fname in os.listdir(path):
                if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'):
                    key = int(re.split('__', str(fname))[0])
                    f[key] = str(fname)

            for k in sorted(f):
                print(k)
                print(f[k])
                print("-"*30)

                df_W = p.read_csv(path+f[k], index_col=False)

                if cnt == 0:
                    df_M = df_W
                else:
                    d_frames = [df_M, df_W]
                    df_M = p.concat(d_frames)

                cnt += 1

                print("-"*30)
                print("Total Records in this Iteration: ", df_M.shape[0])

            FtgtFileName = fname.replace('_splitted_', '')
            first, FinalFileName = re.split("__", FtgtFileName)

            clog.logr(FinalFileName, Ind, df_M, gen_dir)

            del [[df_W], [df_M]]
            gc.collect()

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

In this script, we’re merging smaller files into a large file. Following are the key snippet that we’ll explore –

for fname in os.listdir(path):
    if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'):
        key = int(re.split('__', str(fname))[0])
        f[key] = str(fname)

In this section, the application will check if in that specified path we’ve files whose extension ends with “_splitted_.csv” & their first name starts with the file name initial i.e. if you have a source file named – acct_addr_20180112.csv, then it will check the first name should start with the -> “acct_addr” & last part should contain “_splitted_.csv”. If it is available, then it will start the merge process by considering one by one file & merging them using pandas data frame (marked in purple color) as shown below –

for k in sorted(f):
    print(k)
    print(f[k])
    print("-"*30)

    df_W = p.read_csv(f[k], index_col=False)

    if cnt == 0:
        df_M = df_W
    else:
        d_frames = [df_M, df_W]
        df_M = p.concat(d_frames)

    cnt += 1

Note that, here f is a dictionary that contains filename in key, value pair. The first part of the split file contains the number.  That way, it would be easier for the merge to club them back to one large file without thinking of orders.

Here, also notice the special function concat provided by the pandas. In this step, applications are merging two data frames.

Finally, the main python script, from where we’ll call it –

4. callSplitMergeFl.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#############################################
#### Written By: Satyaki De              ####
#############################################
import clsSplitFl as t
import clsMergeFl as cm
import re
import platform as pl
import os

def main():
    print("Calling the custom Package for large file splitting..")
    os_det = pl.system()

    print("Running on :", os_det)

    ###############################################################
    ###### User Input based on Windows OS                  ########
    ###############################################################

    srcF = str(input("Please enter the file name with extension:"))
    base_name = re.sub(r'[0-9]','', srcF)
    srcFileInit = base_name[:-5]

    if os_det == "Windows":
        subdir = "\\temp\\"
        path = os.path.dirname(os.path.realpath(__file__)) + "\\"
    else:
        subdir = "/temp/"
        path = os.path.dirname(os.path.realpath(__file__)) + '/'

    ###############################################################
    ###### End Of User Input                                 ######
    ###############################################################

    x = t.clsSplitFl(srcF, path, subdir)

    ret_val = x.split_files()

    if ret_val == 0:
        print("Splitting Successful!")
    else:
        print("Splitting Failure!")

    print("-"*30)

    print("Finally, Merging small splitted files to make the same big file!")

    y = cm.clsMergeFl(srcFileInit)

    ret_val1 = y.merge_file()

    if ret_val1 == 0:
        print("Merge Successful!")
    else:
        print("Merge Failure!")

    print("-"*30)



if __name__ == "__main__":
    main()

Following are the key section that we can check –

import clsSplitFl as t
import clsMergeFl as cm

Like any other standard python package, we’re importing our own class into our main callable script.

x = t.clsSplitFl(srcF, path, subdir)
ret_val = x.split_files()

Or,
y = cm.clsMergeFl(srcFileInit)
ret_val1 = y.merge_file()

In this section, we’ve instantiated the class & then we’re calling its function. And, based on the return value – we’re printing the status of our application last run.

The final run of this application looks like ->

Windows:

final_run_windows

Mac:

final_run_mac

And, the final file should look like this –

Windows:

win_img1

MAC:

mac_img1

Left-hand side representing windows final processed/output file, whereas right-hand side representing MAC final processed/output file.

Hope, this will give you some idea about how we can use pandas in various cases apart from conventional data computing.

In this post, I skipped the exception part intentionally. I’ll post one bonus post once my series complete.

Let me know, what do you think.

Till then, Happy Avenging!

Satyaki De

Python Verse – Universe of Avengers in Computer Language World!

The last couple of years, I’ve been working on various technologies. And, one of the interesting languages that I came across is Python. It is extremely flexible for developers to learn & rapidly develop with very few lines of code compared to the other languages. There are major versions of python that I worked with. Among them, python 2.7 & current python 3.7.1 are very popular to developers & my personal favorite.

There are many useful packages that are available to reduce the burden of the developers. Among them, packages like “pandas”, “numpy”, “json”, “AES”, “threading” etc. are extremely useful & one can do lot’s of work with it.

I personally prefer Ubuntu or Mac version of python. However, I’ve worked on Windows version as well or developed python based framework & application, which works in all the major operating systems. If you take care few things from the beginning, then you don’t have to make much more changes of your python application in order to work in all the major operating systems. 🙂

To me, Python Universe is nothing shorter than Marvel’s Universe of Avengers. In order to beat Supreme Villain Thanos (That Challenging & Complex Product with extremely tight timeline), you got to have 6 infinity stones to defeat him.

  1. Space Stone ( Pandas & Numpy )
  2. Reality Stone ( Json, SSL & Encryption/Decryption )
  3. Power Stone ( Multi-Threading/Multi-Processing )
  4. Mind Stone ( OS, Database, Directories & Files )
  5. Soul Stone ( Logging & Exception )
  6. Time Stone ( Cloud Interaction & Framework )

I’ll release a series of python based post in coming days, which might be useful for many peers or information seeker. Hopefully, this installment is a beginning & please follow my post. I hope, very soon you will get many such useful posts.

You get the latest version of Python from the official site given below –

Python Link (3.7.1)

Make sure you must install pip package along with python. I’m not going in details of how one should install python in either of Windows/Mac or Linux.

Just showing you how to install individual python packages.

Windows:

pip install pandas

Linux/Mac:

sudo python3.7 -m pip install pandas

From the second example, you can see that you can install packages to specific python version in case if you have multiple versions of python.

Note that: There might be slight variation based on different versions of Linux. Make sure you are using the correct syntax as per your flavor.

You can get plenty of good sites, where the detailed step-by-step process shared for each operating system.

Till then – Happy Avenging!