Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(6, 8)
total_2 = total_2 + random.randint(5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2190, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x 260
center_y: self.parent.center_y 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/255, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x 250
center_y: self.parent.center_y 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x 240
center_y: self.parent.center_y 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Predicting Flipkart business growth factor using Linear-Regression Machine Learning Model

Hi Guys,

Today, We’ll be exploring the potential business growth factor using the “Linear-Regression Machine Learning” model. We’ve prepared a set of dummy data & based on that, we’ll predict.

Let’s explore a few sample data –

1. Sample Data

So, based on these data, we would like to predict YearlyAmountSpent dependent on any one of the following features, i.e. [ Time On App / Time On Website / Flipkart Membership Duration (In Year) ].

You need to install the following packages –

pip install pandas

pip install matplotlib

pip install sklearn

We’ll be discussing only the main calling script & class script. However, we’ll be posting the parameters without discussing it. And, we won’t discuss clsL.py as we’ve already discussed that in our previous post.

1. clsConfig.py (This script contains all the parameter details.)

################################################
#### Written By: SATYAKI DE                 ####
#### Written On: 15-May-2020                ####
####                                        ####
#### Objective: This script is a config     ####
#### file, contains all the keys for        ####
#### Machine-Learning. Application will     ####
#### process these information & perform    ####
#### various analysis on Linear-Regression. ####
################################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'FlipkartCustomers.csv',
        'SRC_PATH': Curr_Path + sep + 'Data' + sep,
        'APP_DESC_1': 'IBM Watson Language Understand!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

2. clsLinearRegression.py (This is the main script, which will invoke the Machine-Learning API & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 15-May-2020              ####
#### Modified On 15-May-2020              ####
####                                      ####
#### Objective: Main scripts for Linear   ####
#### Regression.                          ####
##############################################

import pandas as p
import numpy as np
import regex as re

import matplotlib.pyplot as plt
from clsConfig import clsConfig as cf

# %matplotlib inline -- for Jupyter Notebook
class clsLinearRegression:
    def __init__(self):
        self.fileName =  cf.config['FILE_NAME']

    def predictResult(self):
        try:

            inputFileName = self.fileName

            # Reading from Input File
            df = p.read_csv(inputFileName)

            print()
            print('Projecting sample rows: ')
            print(df.head())

            print()
            x_row = df.shape[0]
            x_col = df.shape[1]

            print('Total Number of Rows: ', x_row)
            print('Total Number of columns: ', x_col)

            # Adding Features
            x = df[['TimeOnApp', 'TimeOnWebsite', 'FlipkartMembershipInYear']]

            # Target Variable - Trying to predict
            y = df['YearlyAmountSpent']

            # Now Train-Test Split of your source data
            from sklearn.model_selection import train_test_split

            # test_size => % of allocated data for your test cases
            # random_state => A specific set of random split on your data
            X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.4, random_state=101)

            # Importing Model
            from sklearn.linear_model import LinearRegression

            # Creating an Instance
            lm = LinearRegression()

            # Train or Fit my model on Training Data
            lm.fit(X_train, Y_train)

            # Creating a prediction value
            flipKartSalePrediction = lm.predict(X_test)

            # Creating a scatter plot based on Actual Value & Predicted Value
            plt.scatter(Y_test, flipKartSalePrediction)

            # Adding meaningful Label
            plt.xlabel('Actual Values')
            plt.ylabel('Predicted Values')

            # Checking Individual Metrics
            from sklearn import metrics

            print()
            mea_val = metrics.mean_absolute_error(Y_test, flipKartSalePrediction)
            print('Mean Absolute Error (MEA): ', mea_val)

            mse_val = metrics.mean_squared_error(Y_test, flipKartSalePrediction)
            print('Mean Square Error (MSE): ', mse_val)

            rmse_val = np.sqrt(metrics.mean_squared_error(Y_test, flipKartSalePrediction))
            print('Square root Mean Square Error (RMSE): ', rmse_val)

            print()

            # Check Variance Score - R^2 Value
            print('Variance Score:')
            var_score = str(round(metrics.explained_variance_score(Y_test, flipKartSalePrediction) * 100, 2)).strip()
            print('Our Model is', var_score, '% accurate. ')
            print()

            # Finding Coeficent on X_train.columns
            print()
            print('Finding Coeficent: ')

            cedf = p.DataFrame(lm.coef_, x.columns, columns=['Coefficient'])
            print('Printing the All the Factors: ')
            print(cedf)

            print()

            # Getting the Max Value from it
            cedf['MaxFactorForBusiness'] = cedf['Coefficient'].max()

            # Filtering the max Value to identify the biggest Business factor
            dfMax = cedf[(cedf['MaxFactorForBusiness'] == cedf['Coefficient'])]

            # Dropping the derived column
            dfMax.drop(columns=['MaxFactorForBusiness'], inplace=True)
            dfMax = dfMax.reset_index()

            print(dfMax)

            # Extracting Actual Business Factor from Pandas dataframe
            str_factor_temp = str(dfMax.iloc[0]['index'])
            str_factor = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", str_factor_temp)
            str_value = str(round(float(dfMax.iloc[0]['Coefficient']),2))

            print()
            print('*' * 80)
            print('Major Busienss Activity - (', str_factor, ') - ', str_value, '%')
            print('*' * 80)
            print()

            # This is require when you are trying to print from conventional
            # front & not using Jupyter notebook.
            plt.show()

            return 0

        except Exception  as e:
            x = str(e)
            print('Error : ', x)

            return 1

Key lines from the above snippet –

# Adding Features
x = df[['TimeOnApp', 'TimeOnWebsite', 'FlipkartMembershipInYear']]

Our application creating a subset of the main datagram, which contains all the features.

# Target Variable - Trying to predict
y = df['YearlyAmountSpent']

Now, the application is setting the target variable into ‘Y.’

# Now Train-Test Split of your source data
from sklearn.model_selection import train_test_split

# test_size => % of allocated data for your test cases
# random_state => A specific set of random split on your data
X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.4, random_state=101)

As per “Supervised Learning,” our application is splitting the dataset into two subsets. One is to train the model & another segment is to test your final model. However, you can divide the data into three sets that include the performance statistics for a large dataset. In our case, we don’t need that as this data is significantly less.

# Train or Fit my model on Training Data
lm.fit(X_train, Y_train)

Our application is now training/fit the data into the model.

# Creating a scatter plot based on Actual Value & Predicted Value
plt.scatter(Y_test, flipKartSalePrediction)

Our application projected the outcome based on the predicted data in a scatterplot graph.

Also, the following concepts captured by using our program. For more details, I’ve provided the external link for your reference –

  1. Mean Absolute Error (MEA)
  2. Mean Square Error (MSE)
  3. Square Root Mean Square Error (RMSE)

And, the implementation has shown as –

mea_val = metrics.mean_absolute_error(Y_test, flipKartSalePrediction)
print('Mean Absolute Error (MEA): ', mea_val)

mse_val = metrics.mean_squared_error(Y_test, flipKartSalePrediction)
print('Mean Square Error (MSE): ', mse_val)

rmse_val = np.sqrt(metrics.mean_squared_error(Y_test, flipKartSalePrediction))
print('Square Root Mean Square Error (RMSE): ', rmse_val)

At this moment, we would like to check the credibility of our model by using the variance score are as follows –

var_score = str(round(metrics.explained_variance_score(Y_test, flipKartSalePrediction) * 100, 2)).strip()
print('Our Model is', var_score, '% accurate. ')

Finally, extracting the coefficient to find out, which particular feature will lead Flikkart for better sale & growth by taking the maximum of coefficient value month the all features are as shown below –

cedf = p.DataFrame(lm.coef_, x.columns, columns=['Coefficient'])

# Getting the Max Value from it
cedf['MaxFactorForBusiness'] = cedf['Coefficient'].max()

# Filtering the max Value to identify the biggest Business factor
dfMax = cedf[(cedf['MaxFactorForBusiness'] == cedf['Coefficient'])]

# Dropping the derived column
dfMax.drop(columns=['MaxFactorForBusiness'], inplace=True)
dfMax = dfMax.reset_index()

Note that we’ve used a regular expression to split the camel-case column name from our feature & represent that with a much more meaningful name without changing the column name.

# Extracting Actual Business Factor from Pandas dataframe
str_factor_temp = str(dfMax.iloc[0]['index'])
str_factor = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", str_factor_temp)
str_value = str(round(float(dfMax.iloc[0]['Coefficient']),2))

print('Major Busienss Activity - (', str_factor, ') - ', str_value, '%')

3. callLinear.py (This is the first calling script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 15-May-2020              ####
#### Modified On 15-May-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsLinearRegression as cw

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_1 = 0
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'MachineLearning_LinearRegression.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Machine Learning - Linear Regression Prediction : ')
        print('-' * 200)

        # Create the instance of the Linear-Regression Class
        x2 = cw.clsLinearRegression()

        ret = x2.predictResult()

        if ret == 0:
            print('Successful Linear-Regression Prediction Generated!')
        else:
            print('Failed to generate Linear-Regression Prediction!')

        print("-" * 200)
        print()

        print('Finding Analysis points..')
        print("*" * 200)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        logging.info(str(e))

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Create the instance of the Linear-Regression
x2 = cw.clsLinearRegression()

ret = x2.predictResult()

In the above snippet, our application initially creating an instance of the main class & finally invokes the “predictResult” method.

Let’s run our application –

Step 1:

First, the application will fetch the following sample rows from our source file – if it is successful.

2. Run_1

Step 2:

Then, It will create the following scatterplot by executing the following snippet –

# Creating a scatter plot based on Actual Value & Predicted Value
plt.scatter(Y_test, flipKartSalePrediction)
3. Run_2

Note that our model is pretty accurate & it has a balanced success rate compared to our predicted numbers.

Step 3:

Finally, it is successfully able to project the critical feature are shown below –

4. Run_3

From the above picture, you can see that our model is pretty accurate (89% approx).

Also, highlighted red square identifying the key-features & their confidence score & finally, the projecting the winner feature marked in green.

So, as per that, we’ve come to one conclusion that Flipkart’s business growth depends on the tenure of their subscriber, i.e., old members are prone to buy more than newer members.

Let’s look into our directory structure –

5. Win_Dir

So, we’ve done it.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Predicting health issues for Senior Citizens based on “Realtime Weather Data” in Python

Hi Guys,

Today, I’ll be presenting a different kind of post here. I’ll be trying to predict health issues for senior citizens based on “realtime weather data” by blending open-source population data using some mock risk factor calculation. At the end of the post, I’ll be plotting these numbers into some graphs for better understanding.

Let’s drive!

For this first, we need realtime weather data. To do that, we need to subscribe to the data from OpenWeather API. For that, you have to register as a developer & you’ll receive a similar email from them once they have approved –

1. Subscription To Open Weather

So, from the above picture, you can see that, you’ll be provided one API key & also offered a couple of useful API documentation. I would recommend exploring all the links before you try to use it.

You can also view your API key once you logged into their console. You can also create multiple API keys & the screen should look something like this –

2. Viewing Keys For security reasons, I’ll be hiding my own keys & the same should be applicable for you as well.

I would say many of these free APIs might have some issues. So, I would recommend you to start testing the open API through postman before you jump into the Python development. Here is the glimpse of my test through the postman –

3. Testing API

Once, I can see that the API is returning the result. I can work on it.

Apart from that, one needs to understand that these API might have limited use & also you need to know the consequences in terms of price & tier in case if you exceeded the limit. Here is the detail for this API –

5. Package Details - API

For our demo, I’ll be using the Free tire only.

Let’s look into our other source data. We got the top 10 city population-wise over there internet. Also, we have collected sample Senior Citizen percentage against sex ratio across those cities. We have masked these values on top of that as this is just for education purposes.

1. CityDetails.csv

Here is the glimpse of this file –

4. Source File

So, this file only contains the total population across the top 10 cities in the USA.

2. SeniorCitizen.csv

6. SeniorCitizen Data

This file contains the Sex ratio of Senior citizens across those top 10 cities by population.

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "http://api.openweathermap.org/data/2.5/weather",
        'API_HOST': "api.openweathermap.org",
        'API_KEY': "XXXXXXXXXXXXXXXXXXXXXX",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Open Weather Forecast',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SRC_FILE': Curr_Path + sep + 'Src_File' + sep + 'CityDetails.csv',
        'SRC_FILE_1': Curr_Path + sep + 'Src_File' + sep + 'SeniorCitizen.csv',
        'SRC_FILE_INIT': 'CityDetails.csv',
        'COL_LIST': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'name', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'weather', 'deg', 'gust', 'speed'],
        'COL_LIST_1': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'CityName', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription'],
        'COL_LIST_2': ['CityName', 'Population', 'State']
    }

2. clsWeather.py (This script contains the main logic to extract the realtime data from our subscribed weather API.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsWeather:
    def __init__(self):
        self.url = cf.config['URL']
        self.openmapapi_host = cf.config['API_HOST']
        self.openmapapi_key = cf.config['API_KEY']
        self.openmapapi_cache = cf.config['CACHE']
        self.openmapapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            openmapapi_host = self.openmapapi_host
            openmapapi_key = self.openmapapi_key
            openmapapi_cache = self.openmapapi_cache
            openmapapi_con = self.openmapapi_con
            type = self.type

            querystring = {"appid": openmapapi_key, "q": rawQry}

            print('Input JSON: ', str(querystring))

            headers = {
                'host': openmapapi_host,
                'content-type': type,
                'Cache-Control': openmapapi_cache,
                'Connection': openmapapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

The key lines from this script –

querystring = {"appid": openmapapi_key, "q": rawQry}

print('Input JSON: ', str(querystring))

headers = {
    'host': openmapapi_host,
    'content-type': type,
    'Cache-Control': openmapapi_cache,
    'Connection': openmapapi_con
}

response = requests.request("GET", url, headers=headers, params=querystring)

ResJson  = response.text

In the above snippet, our application first preparing the payload & the parameters received from our param script. And then invoke the GET method to extract the real-time data in the form of JSON & finally sending the JSON payload to the primary calling function.

3. clsMap.py (This script contains the main logic to prepare the MAP using seaborn package & try to plot our custom made risk factor by blending the realtime data with our statistical data received over the internet.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### plot into the Map.                   ####
##############################################

import seaborn as sns
import logging
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl

# This library requires later
# to print the chart
import matplotlib.pyplot as plt

class clsMap:
    def __init__(self):
        self.src_file =  cf.config['SRC_FILE_1']

    def calculateRisk(self, row):
        try:
            # Let's assume some logic
            # 1. By default, 30% of Senior Citizen
            # prone to health Issue for each City
            # 2. Male Senior Citizen is 19% more prone
            # to illness than female.
            # 3. If humidity more than 70% or less
            # than 40% are 22% main cause of illness
            # 4. If feels like more than 280 or
            # less than 260 degree are 17% more prone
            # to illness.
            # Finally, this will be calculated per 1K
            # people around 10 blocks

            str_sex = str(row['Sex'])

            int_humidity = int(row['humidity'])
            int_feelsLike = int(row['feels_like'])
            int_population = int(str(row['Population']).replace(',',''))
            float_srcitizen = float(row['SeniorCitizen'])

            confidance_score = 0.0

            SeniorCitizenPopulation = (int_population * float_srcitizen)

            if str_sex == 'Male':
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.19) + confidance_score
            else:
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.11) + confidance_score

            if ((int_humidity > 70) | (int_humidity < 40)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.22

            if ((int_feelsLike > 280) | (int_feelsLike < 260)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.17

            final_score = round(round(confidance_score, 2) / (1000 * 10), 2)

            return final_score

        except Exception as e:
            x = str(e)

            return x

    def setMap(self, dfInput):
        try:
            resVal = 0
            df = p.DataFrame()
            debug_ind = 'Y'
            src_file =  self.src_file

            # Initiating Log Class
            l = cl.clsL()

            df = dfInput

            # Creating a subset of desired columns
            dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

            l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

            # Fetching Senior Citizen Data
            df = p.read_csv(src_file, index_col=False)

            # Merging two frames
            dfMerge = p.merge(df, dfMod, on=['CityName'])

            l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

            # Getting RiskFactor quotient from our custom made logic
            dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

            l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

            # Generating Map plotss
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})
            sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

            # This is required when you are running
            # through normal Python & not through
            # Jupyter Notebook
            plt.show()

            return resVal

        except Exception as e:
            x = str(e)
            print(x)

            logging.info(x)
            resVal = x

            return resVal

Key lines from the above codebase –

# Creating a subset of desired columns
dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

# Fetching Senior Citizen Data
df = p.read_csv(src_file, index_col=False)

# Merging two frames
dfMerge = p.merge(df, dfMod, on=['CityName'])

l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

# Getting RiskFactor quotient from our custom made logic
dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

Combining our Senior Citizen data with already processed data coming from our primary calling script. Also, here the application is calculating our custom logic to find out the risk factor figures. If you want to go through that, I’ve provided the logic to derive it. However, this is just a demo to find out similar figures. You should not rely on the logic that I’ve used (It is kind of my observation of life till now. :D).

The below lines are only required when you are running seaborn, not via Jupyter notebook.

plt.show()

4. callOpenMapWeatherAPI.py (This is the first calling script. This script also calls the realtime API & then blend the first file with it & pass the only relevant columns of data to our Map script to produce the graph.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsWeather as ct
import re
import numpy as np
import clsMap as cm

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getMainWeather(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_main_weather = str(df_lkp.iloc[0]['main'])

        return str_main_weather

    except Exception as e:
        x = str(e)
        str_main_weather = x

        return str_main_weather

def getMainDescription(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_description = str(df_lkp.iloc[0]['description'])

        return str_description

    except Exception as e:
        x = str(e)
        str_description = x

        return str_description

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']
        col_list = cf.config['COL_LIST']
        col_list_1 = cf.config['COL_LIST_1']
        col_list_2 = cf.config['COL_LIST_2']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        df2 = p.DataFrame()

        src_file =  cf.config['SRC_FILE']

        # Fetching data from source file
        df = p.read_csv(src_file, index_col=False)

        # Creating a list of City Name from the source file
        city_list = df['CityName'].tolist()

        # Declaring an empty dictionary
        merge_dict = {}
        merge_dict['city'] = df2

        start_pos = 1
        src_file_name = '1.' + cf.config['SRC_FILE_INIT']

        for i in city_list:
            x1 = ct.clsWeather()
            ret_2 = x1.searchQry(i)

            # Capturing the JSON Payload
            res = json.loads(ret_2)

            # Converting dictionary to Pandas Dataframe
            # df_ret = p.read_json(ret_2, orient='records')

            df_ret = p.io.json.json_normalize(res)
            df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

            # Removing any duplicate columns
            df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

            # l.logr(str(start_pos) + '.1.' + src_file_name, debug_ind, df_ret, 'log')
            start_pos = start_pos + 1

            # If all the conversion successful
            # you won't get any gust column
            # from OpenMap response. Hence, we
            # need to add dummy reason column
            # to maintain the consistent structures

            if 'gust' not in df_ret.columns:
                df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

            # Resetting the column orders as per JSON
            column_order = col_list
            df_mod_ret = df_ret.reindex(column_order, axis=1)

            if start_pos == 1:
                merge_dict['city'] = df_mod_ret
            else:
                d_frames = [merge_dict['city'], df_mod_ret]
                merge_dict['city'] = p.concat(d_frames)

            start_pos += 1

        for k, v in merge_dict.items():
            l.logr(src_file_name, debug_ind, merge_dict[k], 'log')

        # Now opening the temporary file
        temp_log_file = log_dir + src_file_name

        dfNew = p.read_csv(temp_log_file, index_col=False)

        # Extracting Complex columns
        dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
        dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

        l.logr('2.dfNew.csv', debug_ind, dfNew, 'log')

        # Removing unwanted columns & Renaming key columns
        dfNew.drop(['weather'], axis=1, inplace=True)
        dfNew.rename(columns={'name': 'CityName'}, inplace=True)

        l.logr('3.dfNewMod.csv', debug_ind, dfNew, 'log')

        # Now joining with the main csv
        # to get the complete picture
        dfMain = p.merge(df, dfNew, on=['CityName'])

        l.logr('4.dfMain.csv', debug_ind, dfMain, 'log')

        # Let's extract only relevant columns
        dfSuppliment = dfMain[['CityName', 'Population', 'State', 'country', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription']]

        l.logr('5.dfSuppliment.csv', debug_ind, dfSuppliment, 'log')

        # Let's pass this to our map section
        x2 = cm.clsMap()
        ret_3 = x2.setMap(dfSuppliment)

        if ret_3 == 0:
            print('Successful Map Generated!')
        else:
            print('Please check the log for further issue!')

        print("-" * 60)
        print()

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

Once the application received the JSON response from the realtime API, the application is converting it to pandas dataframe.

# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

Since this is a complex JSON response. The application might encounter duplicate columns, which might cause a problem later. Hence, our app is removing all these duplicate columns as they are not required for our cases.

if 'gust' not in df_ret.columns:
    df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

There is a possibility that the application might not receive all the desired attributes from the realtime API. Hence, the above lines will check & add a dummy column named gust for those records in case if they are not present in the JSON response.

if start_pos == 1:
    merge_dict['city'] = df_mod_ret
else:
    d_frames = [merge_dict['city'], df_mod_ret]
    merge_dict['city'] = p.concat(d_frames)

These few lines required as our API has a limitation of responding with only one city at a time. Hence, in this case, we’re retrieving one town at a time & finally merge them into a single dataframe before creating a temporary source file for the next step.

At this moment our data should look like this –

16. Intermediate_Data_1

Let’s check the weather column. We need to extract the main & description for our dashboard, which will be coming in the next installment.

# Extracting Complex columns
dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

Hence, we’ve used the following two functions to extract these values & the critical snippet from one of the service is as follows –

lkp_Columns = str(row['weather'])
jpayload = str(lkp_Columns).replace("'", '"')
payload = json.loads(jpayload)

df_lkp = p.io.json.json_normalize(payload)
df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

str_main_weather = str(df_lkp.iloc[0]['main'])

The above lines extracting the weather column & replacing the single quotes with the double quotes before the application is trying to convert that to JSON. Once it converted to JSON, the json_normalize will easily serialize it & create individual columns out of it. Once you have them captured inside the pandas dataframe, you can extract the unique values & store them & return them to your primary calling function.

# Let's pass this to our map section
x2 = cm.clsMap()
ret_3 = x2.setMap(dfSuppliment)

if ret_3 == 0:
    print('Successful Map Generated!')
else:
    print('Please check the log for further issue!')

In the above lines, the application will invoke the Map class to calculate the remaining logic & then plotting the data into the seaborn graph.

Let’s just briefly see the central directory structure –

10. RunWindow

Here is the log directory –

11. Log Directory

And, finally, the source directory should look something like this –

12. SourceDir

Now, let’s runt the application –

Following lines are essential –

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')

This will project the plot like this –

13. AdditionalOption

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})

This will lead to the following figures –

14. Adding Markers

As you can see, here, using the marker of (‘o’/’v’) leads to two different symbols for the different gender.

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

This will lead to –

15. Separate By Sex

So, in this case, the application has created two completely different sets for Sex.

So, finally, we’ve done it. 😀

In the next post, I’ll be doing some more improvisation on top of these data sets. Till then – Happy Avenging! 🙂

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Publishing new Python Library for JSON & NoSQL

Hi Guys!

As discussed,

Please find the link of the PyPI package of new enhanced JSON library on Python. This is particularly very useful as I’ve accommodated the following features into it.

  1. distinct
  2. nvl
  3. partition_by
  4. regex_like
  5. regex_replace
  6. regex_substr

All these functions can be used over JSON payload through python. I’ll discuss this in details in my next blog post.

However, I would like to suggest this library that will be handy for NoSQL databases like Cosmos DB. Now, you can quickly implement many of these features such as distinct, partitioning & regular expressions with less effort.

Please find the library URL.

Let me know your feedback on the same.

N.B.: I’ve tested this library both on Windows 10 & Ubuntu 18. And, the python version that I’ve used are Python3.6 & Python3.7.

Till then!

Happy Avenging!

Pandas with Encryption/Decryption along with the JSON – (Client API Access) along with Data Queue (A crossover between Space stone, Reality Stone & Power Stone)

Today, we’ll be discussing a new cross-over between API, JSON, Encryption along with data distribution through Queue.

The primary objective here is to distribute one csv file through API service & access our previously deployed Encryption/Decryption methods by accessing the parallel call through Queue. In this case, our primary objective is to allow asynchronous calls to Queue for data distributions & at this point we’re not really looking for performance improvement. Instead, our goal to achieve the target.

My upcoming posts will discuss the improvement of performance using Parallel calls.

Let’s discuss it now.

Please find the structure of our Windows & MAC directory are as follows –

Win_Vs_MAC

We’re not going to discuss any scripts, which we’ve already discussed in my previous posts. Please refer the relevant earlier posts from my blogs.

1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
####                               ########
#### Objective: Log File           ########
###########################################
import pandas as p
import platform as pl
from clsParam import clsParam as cf

class clsL(object):
    def __init__(self):
        self.path = cf.config['PATH']

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df
            sd = subdir

            os_det = pl.system()

            if sd == None:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + Filename
                else:
                    fullFileName = self.path + '/' + Filename
            else:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + sd + "\\" + Filename
                else:
                    fullFileName = self.path + '/' + sd + "/" + Filename

            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

2. callRunServer.py (This script will create an instance of a server. Once, it is running – it will emulate the Server API functionalities. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
#### Also, this will create an instance ####
#### of the server & create an endpoint ####
#### or API using flask framework.      ####
############################################

from flask import Flask
from flask import jsonify
from flask import request
from flask import abort
from clsConfigServer import clsConfigServer as csf
import clsFlask as clf

app = Flask(__name__)

@app.route('/process/getEncrypt', methods=['POST'])
def getEncrypt():
    try:
        # If the server application doesn't have
        # valid json, it will throw 400 error
        if not request.get_json:
            abort(400)

        # Capturing the individual element
        content = request.get_json()

        dGroup = content['dataGroup']
        input_data = content['data']
        dTemplate = content['dataTemplate']

        # For debug purpose only
        print("-" * 157)
        print("Group: ", dGroup)
        print("Data: ", input_data)
        print("Template: ", dTemplate)
        print("-" * 157)

        ret_val = ''

        if ((dGroup != '') & (dTemplate != '')):
            y = clf.clsFlask()
            ret_val = y.getEncryptProcess(dGroup, input_data, dTemplate)
        else:
            abort(500)

        return jsonify({'status': 'success', 'encrypt_val': ret_val})
    except Exception as e:
        x = str(e)
        return jsonify({'status': 'error', 'detail': x})


@app.route('/process/getDecrypt', methods=['POST'])
def getDecrypt():
    try:
        # If the server application doesn't have
        # valid json, it will throw 400 error
        if not request.get_json:
            abort(400)

        # Capturing the individual element
        content = request.get_json()

        dGroup = content['dataGroup']
        input_data = content['data']
        dTemplate = content['dataTemplate']

        # For debug purpose only
        print("-" * 157)
        print("Group: ", dGroup)
        print("Data: ", input_data)
        print("Template: ", dTemplate)
        print("-" * 157)

        ret_val = ''

        if ((dGroup != '') & (dTemplate != '')):
            y = clf.clsFlask()
            ret_val = y.getDecryptProcess(dGroup, input_data, dTemplate)
        else:
            abort(500)

        return jsonify({'status': 'success', 'decrypt_val': ret_val})
    except Exception as e:
        x = str(e)
        return jsonify({'status': 'error', 'detail': x})


def main():
    try:
        print('Starting Encrypt/Decrypt Application!')

        # Calling Server Start-Up Script
        app.run(debug=True, host=str(csf.config['HOST_IP_ADDR']))
        ret_val = 0

        if ret_val == 0:
            print("Finished Returning Message!")
        else:
            raise IOError
    except Exception as e:
        print("Server Failed To Start!")

if __name__ == '__main__':
    main()

 

3. clsFlask.py (This script is part of the server process, which will categorize the encryption logic based on different groups. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 25-Jan-2019           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: This script will       ####
#### encrypt/decrypt based on the      ####
#### supplied salt value. Also,        ####
#### this will capture the individual  ####
#### element & stored them into JSON   ####
#### variables using flask framework.  ####
###########################################

from clsConfigServer import clsConfigServer as csf
import clsEnDecAuth as cen

class clsFlask(object):
    def __init__(self):
        self.xtoken = str(csf.config['DEF_SALT'])

    def getEncryptProcess(self, dGroup, input_data, dTemplate):
        try:
            # It is sending default salt value
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will encrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.encrypt_str(input_data)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid json Error Response
            return ret_val

    def getDecryptProcess(self, dGroup, input_data, dTemplate):
        try:
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will decrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])
                    print("xtoken: ", xtoken)
                    print("Flask Input Data: ", input_data)
                    x = cen.clsEnDec(xtoken)
                    ret_val = x.decrypt_str(input_data)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid Error Response
            return ret_val

 

4. clsEnDec.py (This script will convert the string to encryption or decryption from its previous states based on the supplied group. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
#### Package Cryptography needs to ########
#### install in order to run this  ########
#### script.                       ########
####                               ########
#### Objective: This script will   ########
#### encrypt/decrypt based on the  ########
#### hidden supplied salt value.   ########
###########################################

from cryptography.fernet import Fernet

class clsEnDec(object):

    def __init__(self, token):
        # Calculating Key
        self.token = token

    def encrypt_str(self, data):
        try:
            # Capturing the Salt Information
            salt = self.token

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            encr_val = str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            return encr_val

        except Exception as e:
            x = str(e)
            print(x)
            encr_val = ''

            return encr_val

    def decrypt_str(self, data):
        try:
            # Capturing the Salt Information
            salt = self.token

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            decr_val = str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            return decr_val

        except Exception as e:
            x = str(e)
            print(x)
            decr_val = ''

            return decr_val

 

5. clsConfigServer.py (This script contains all the main parameter details of your emulated API server. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 10-Feb-2019       ########
####                               ########
#### Objective: Parameter File     ########
###########################################

import os
import platform as pl

# Checking with O/S system
os_det = pl.system()

class clsConfigServer(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    if os_det == "Windows":
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '\\' + 'src_file\\',
            'PROFILE_FILE_PATH': Curr_Path + '\\' + 'profile\\',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }
    else:
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '/' + 'src_file/',
            'PROFILE_FILE_PATH': Curr_Path + '/' + 'profile/',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }

 

6. clsWeb.py (This script will receive the input Pandas dataframe & then convert it to JSON & then send it back to our Flask API Server for encryption/decryption. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 09-Mar-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate API based JSON requests   ####
#### at the server & receive the        ####
#### response from it & transform it    ####
#### back to the data-frame.            ####
############################################

import json
import requests
import datetime
import time
import ssl
import os
from clsParam import clsParam as cf

class clsWeb(object):
    def __init__(self, payload):
        self.payload = payload
        self.path = str(cf.config['PATH'])
        self.max_retries = int(cf.config['MAX_RETRY'])
        self.encrypt_ulr = str(cf.config['ENCRYPT_URL'])
        self.decrypt_ulr = str(cf.config['DECRYPT_URL'])

    def getResponse(self, mode):

        # Assigning Logging Info
        max_retries = self.max_retries
        encrypt_ulr = self.encrypt_ulr
        decrypt_ulr = self.decrypt_ulr
        En_Dec_Mode = mode

        try:

            # Bypassing SSL Authentication
            try:
                _create_unverified_https_context = ssl._create_unverified_context
            except AttributeError:
                # Legacy python that doesn't verify HTTPS certificates by default
                pass
            else:
                # Handle target environment that doesn't support HTTPS verification
                ssl._create_default_https_context = _create_unverified_https_context

            # Providing the url
            if En_Dec_Mode == 'En':
                url = encrypt_ulr
            else:
                url = decrypt_ulr

            print("URL::", url)

            # Capturing the payload
            data = self.payload

            # Converting String to Json
            # json_data = json.loads(data)
            json_data = json.loads(data)

            print("JSON:::::::", str(json_data))

            headers = {"Content-type": "application/json"}
            param = headers

            var1 = datetime.datetime.now().strftime("%H:%M:%S")
            print('Json Fetch Start Time:', var1)

            retries = 1
            success = False

            while not success:
                # Getting response from web service
                # response = requests.post(url, params=param, json=data, auth=(login, password), verify=False)
                response = requests.post(url, params=param, json=json_data, verify=False)
                print("Complete Return Code:: ", str(response.status_code))
                print("Return Code Initial::", str(response.status_code)[:1])

                if str(response.status_code)[:1] == '2':
                    # response = s.post(url, params=param, json=json_data, verify=False)
                    success = True
                else:
                    wait = retries * 2
                    print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
                    time.sleep(wait)
                    retries += 1
                    # print('Return Service::')

                # Checking Maximum Retries
                if retries == max_retries:
                    success = True
                    raise ValueError

                print("JSON RESPONSE:::", response.text)

                var2 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch End Time:', var2)

                # Capturing the response json from Web Service
                response_json = response.text
                load_val = json.loads(response_json)

                # Based on the mode application will send the return value
                if En_Dec_Mode == 'En':
                    encrypt_ele = str(load_val['encrypt_val'])
                    return_ele = encrypt_ele
                else:
                    decrypt_ele = str(load_val['decrypt_val'])
                    return_ele = decrypt_ele

            return return_ele

        except ValueError as v:
            raise ValueError

        except Exception as e:
            x = str(e)
            print(x)

            return 'Error'

Let’s discuss the key lines –

try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    # Legacy python that doesn't verify HTTPS certificates by default
    pass
else:
    # Handle target environment that doesn't support HTTPS verification
    ssl._create_default_https_context = _create_unverified_https_context

If you are running in a secure environment. Sometimes, your proxy or firewall blocks you from accessing the API server – if they are using different networks. Hence, we need to bypass that. However, it is advisable not to use this in Prod environment for obvious reasons.

# Capturing the payload
data = self.payload

# Converting String to Json
json_data = json.loads(data)

This snippet will convert your data frame into a JSON object.

response = requests.post(url, params=param, json=json_data, verify=False)
print("Complete Return Code:: ", str(response.status_code))
print("Return Code Initial::", str(response.status_code)[:1])

if str(response.status_code)[:1] == '2':
    # response = s.post(url, params=param, json=json_data, verify=False)
    success = True
else:
    wait = retries * 2
    print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
    time.sleep(wait)
    retries += 1
    # print('Return Service::')

# Checking Maximum Retries
if retries == max_retries:
    success = True
    raise ValueError

In the first 3 lines, the application is building a JSON response, which will be sent to the API Server. And, it will capture the response from the server.

Next 8 lines will check the status code. And, based on the status code, it will continue or retry the requests in case if there is any failure or lousy response from the server.

Last 3 lines say if the application crosses the maximum allowable error limit, it will terminate the process by raising it as an error.

# Capturing the response json from Web Service
response_json = response.text
load_val = json.loads(response_json)

Once, it receives the valid response, the application will convert it back to the dataframe & send it to the calling methods.

7. clsParam.py (This script contains the fundamental parameter values to run your client application. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import os

class clsParam(object):

    config = {
        'MAX_RETRY' : 5,
        'ENCRYPT_MODE' : 'En',
        'DECRYPT_MODE': 'De',
        'PATH' : os.path.dirname(os.path.realpath(__file__)),
        'SRC_DIR' : os.path.dirname(os.path.realpath(__file__)) + '/' + 'src_files/',
        'FIN_DIR': os.path.dirname(os.path.realpath(__file__)) + '/' + 'finished/',
        'ENCRYPT_URL': "http://192.168.0.13:5000/process/getEncrypt",
        'DECRYPT_URL': "http://192.168.0.13:5000/process/getDecrypt",
        'NUM_OF_THREAD': 20
    }

 

8. clsSerial.py (This script will show the usual or serial way to convert your data into encryption & then to decrypts & store the result into two separate csv files. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data      ####
#### using serial mode operation.       ####
############################################

import pandas as p
import clsWeb as cw
import datetime
from clsParam import clsParam as cf

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsSerial(object):
    def __init__(self):
        self.path = cf.config['PATH']
        self.EncryptMode = str(cf.config['ENCRYPT_MODE'])
        self.DecryptMode = str(cf.config['DECRYPT_MODE'])

    # Lookup Methods for Encryption
    def encrypt_acctNbr(self, row):
        # Declaring Local Variable
        en_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctNbr = x.getResponse(EncryptMode)
        else:
            en_AcctNbr = ''

        fil_acct_nbr = ''
        fil_acct_nbr = ''

        return en_AcctNbr

    def encrypt_Name(self, row):
        # Declaring Local Variable
        en_AcctName = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctName = x.getResponse(EncryptMode)
        else:
            en_AcctName = ''

        return en_AcctName

    def encrypt_Phone(self, row):
        # Declaring Local Variable
        en_Phone = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Phone = x.getResponse(EncryptMode)
        else:
            en_Phone = ''

        return en_Phone

    def encrypt_Email(self, row):
        # Declaring Local Variable
        en_Email = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Email = x.getResponse(EncryptMode)
        else:
            en_Email = ''

        return en_Email

    # Lookup Methods for Decryption
    def decrypt_acctNbr(self, row):
        # Declaring Local Variable
        de_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctNbr = x.getResponse(EncryptMode)
        else:
            de_AcctNbr = ''

        return de_AcctNbr

    def decrypt_Name(self, row):
        # Declaring Local Variable
        de_AcctName = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctName = x.getResponse(EncryptMode)
        else:
            de_AcctName = ''

        return de_AcctName

    def decrypt_Phone(self, row):
        # Declaring Local Variable
        de_Phone = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Phone = x.getResponse(EncryptMode)
        else:
            de_Phone = ''

        return de_Phone

    def decrypt_Email(self, row):
        # Declaring Local Variable
        de_Email = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Email = x.getResponse(EncryptMode)
        else:
            de_Email = ''

        return de_Email

    def getEncrypt(self, df_payload):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            df_input = df_payload

            # Checking total count of rows
            count_row = df_input.shape[0]
            print('Total number of records to process:: ', count_row)

            # Deriving rows
            df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
            df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
            df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
            df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

            # Dropping original columns
            df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

            # Renaming new columns with the old column names
            df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
            df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
            df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
            df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

            # New Column List Orders
            column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
            df_fin = df_input.reindex(column_order, axis=1)

            return df_fin
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})

            return df_error


    def getDecrypt(self, df_encrypted_payload):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            df_input = df_encrypted_payload

            # Checking total count of rows
            count_row = df_input.shape[0]
            print('Total number of records to process:: ', count_row)


            # Deriving rows
            df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
            df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
            df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
            df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)

            # Dropping original columns
            df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

            # Renaming new columns with the old column names
            df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
            df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
            df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
            df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)

            # New Column List Orders
            column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email']
            df_fin = df_input.reindex(column_order, axis=1)

            return df_fin
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':''})

            return df_error

Key lines to discuss –

Main two methods, we’ll be looking into & they are –

a. getEncrypt

b. getDecrypt

However, these two functions constructions are identical in nature. One is for encryption & the other one is decryption.

# Deriving rows
df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

As you can see, the application is processing row-by-row & column-by-column data transformations using look-up functions.

# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

As the comment suggested, the application is dropping all the unencrypted source columns.

# Renaming new columns with the old column names
df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

Once, the application drops all the source columns, it will rename the new column names back to old columns & based on this data will be merged with the rest of the data from the source csv.

# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
df_fin = df_input.reindex(column_order, axis=1)

Once, the application finished doing all these transformations, it will now re-sequence the order of the columns, which will create the same column order as it’s source csv files.

Similar logic is applicable for the decryption as well.

As we know, there are many look-up methods take part as part of this drive.

encrypt_acctNbr, encrypt_Name, encrypt_Phone, encrypt_Email
decrypt_acctNbr, decrypt_Name, decrypt_Phone, decrypt_Email

We’ll discuss only one method as these are completely identical.

# Capturing essential values
EncryptMode = self.EncryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr = str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()

From the row, our application is extracting the relevant column. In this case, it is Acct_Nbr. And, then converts it to string & remove any unnecessary white space from it.

# Forming JSON String for this field
json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

Once extracted, the application will build the target JON string as per column data.

# Identifying Length of the field
len_acct_nbr = len(fil_acct_nbr)

# This will trigger the service if it has valid data
if len_acct_nbr > 0:
    x = cw.clsWeb(json_source_str)
    en_AcctNbr = x.getResponse(EncryptMode)
else:
    en_AcctNbr = ''

Based on the length of the extracted value, our application will trigger the individual JSON requests & will receive the data frame in response.

9. clsParallel.py (This script will use the queue to make asynchronous calls & perform the same encryption & decryption. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
#### This script will use the advance   ####
#### queue & asynchronus calls to the   ####
#### API Server to process Encryption & ####
#### Decryption on our csv files.       ####
############################################
import pandas as p
import clsWebService as cw
import datetime
from clsParam import clsParam as cf
from multiprocessing import Lock, Process, Queue, freeze_support, JoinableQueue
import gc
import signal
import time
import os
import queue
import asyncio

# Declaring Global Variable
q = Queue()
lock = Lock()

finished_task = JoinableQueue()
pending_task = JoinableQueue()

sp_fin_dict = {}
dp_fin_dict = {}

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsParallel(object):
    def __init__(self):
        self.path = cf.config['PATH']
        self.EncryptMode = str(cf.config['ENCRYPT_MODE'])
        self.DecryptMode = str(cf.config['DECRYPT_MODE'])
        self.num_worker_process = int(cf.config['NUM_OF_THREAD'])
        self.lock = Lock()

    # Lookup Methods for Encryption
    def encrypt_acctNbr(self, row):
        # Declaring Local Variable
        en_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctNbr = x.getResponse(EncryptMode)
        else:
            en_AcctNbr = ''

        fil_acct_nbr = ''

        return en_AcctNbr

    def encrypt_Name(self, row):
        # Declaring Local Variable
        en_AcctName = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctName = x.getResponse(EncryptMode)
        else:
            en_AcctName = ''

        return en_AcctName

    def encrypt_Phone(self, row):
        # Declaring Local Variable
        en_Phone = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Phone = x.getResponse(EncryptMode)
        else:
            en_Phone = ''

        return en_Phone

    def encrypt_Email(self, row):
        # Declaring Local Variable
        en_Email = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Email = x.getResponse(EncryptMode)
        else:
            en_Email = ''

        return en_Email

    # Lookup Methods for Decryption
    def decrypt_acctNbr(self, row):
        # Declaring Local Variable
        de_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctNbr = x.getResponse(EncryptMode)
        else:
            de_AcctNbr = ''

        return de_AcctNbr

    def decrypt_Name(self, row):
        # Declaring Local Variable
        de_AcctName = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctName = x.getResponse(EncryptMode)
        else:
            de_AcctName = ''

        return de_AcctName

    def decrypt_Phone(self, row):
        # Declaring Local Variable
        de_Phone = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Phone = x.getResponse(EncryptMode)
        else:
            de_Phone = ''

        return de_Phone

    def decrypt_Email(self, row):
        # Declaring Local Variable
        de_Email = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Email = x.getResponse(EncryptMode)
        else:
            de_Email = ''

        return de_Email

    def getEncrypt(self, df_dict):
        try:
            en_fin_dict = {}

            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_dict.items():
                Process_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])
            print('Part number of records to process:: ', count_row)

            if count_row > 0:

                # Deriving rows
                df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
                df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
                df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
                df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
                df_fin = df_input.reindex(column_order, axis=1)

                sp_fin_dict[Process_Name] = df_fin

            return sp_fin_dict
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})
            sp_fin_dict[Process_Name] = df_error

            return sp_fin_dict

    async def produceEncr(self, queue, l_dict):

        m_dict = {}

        m_dict = self.getEncrypt(l_dict)

        for k, v in m_dict.items():
            item = k
            print('producing {}...'.format(item))

        await queue.put(m_dict)


    async def consumeEncr(self, queue):
        result_dict = {}

        while True:
            # wait for an item from the producer
            sp_fin_dict.update(await queue.get())

            # process the item
            for k, v in sp_fin_dict.items():
                item = k
                print('consuming {}...'.format(item))

            # Notify the queue that the item has been processed
            queue.task_done()


    async def runEncrypt(self, n, df_input):
        l_dict = {}

        queue = asyncio.Queue()
        # schedule the consumer
        consumer = asyncio.ensure_future(self.consumeEncr(queue))

        start_pos = 0
        end_pos = 0

        num_worker_process = n

        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            name = 'Task-' + str(i)

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            print("start_pos: ", start_pos)
            print("end_pos: ", end_pos)

            split_df = df_input.iloc[start_pos:end_pos]
            l_dict[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            # run the producer and wait for completion
            await self.produceEncr(queue, l_dict)
            # wait until the consumer has processed all items
            await queue.join()

        # the consumer is still awaiting for an item, cancel it
        consumer.cancel()

        return sp_fin_dict


    def getEncryptParallel(self, df_payload):

        l_dict = {}
        data_dict = {}
        min_val_list = {}
        cnt = 1
        num_worker_process = self.num_worker_process
        actual_worker_task = 0
        number_of_processes = 4

        processes = []

        split_df = p.DataFrame()
        df_ret = p.DataFrame()
        dummy_df = p.DataFrame()

        # Assigning Target File Basic Name
        df_input = df_payload

        # Checking total count of rows
        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row/interval) + 1

        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.runEncrypt(actual_worker_task, df_input))
        loop.close()

        for k, v in sp_fin_dict.items():
            min_val_list[int(k.replace('Task-', ''))] = v

        min_val = min(min_val_list, key=int)
        print("Minimum Index Value: ", min_val)

        for k, v in sorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
            if int(k.replace('Task-', '')) == min_val:
                df_ret = sp_fin_dict[k]
            else:
                d_frames = [df_ret, sp_fin_dict[k]]
                df_ret = p.concat(d_frames)

        return df_ret

    def getDecrypt(self, df_encrypted_dict):
        try:
            de_fin_dict = {}

            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_encrypted_dict.items():
                Process_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])
            print('Part number of records to process:: ', count_row)

            if count_row > 0:

                # Deriving rows
                df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
                df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
                df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
                df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
                df_fin = df_input.reindex(column_order, axis=1)

                de_fin_dict[Process_Name] = df_fin

            return de_fin_dict

        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr': str(e), 'Name': '', 'Acct_Addr_1': '', 'Acct_Addr_2': '', 'Phone': '', 'Email': '', 'Serial_No': ''})
            de_fin_dict[Process_Name] = df_error

            return de_fin_dict

    async def produceDecr(self, queue, l_dict):

        m_dict = {}

        m_dict = self.getDecrypt(l_dict)

        for k, v in m_dict.items():
            item = k
            print('producing {}...'.format(item))

        await queue.put(m_dict)


    async def consumeDecr(self, queue):
        result_dict = {}

        while True:
            # wait for an item from the producer
            dp_fin_dict.update(await queue.get())

            # process the item
            for k, v in dp_fin_dict.items():
                item = k
                print('consuming {}...'.format(item))

            # Notify the queue that the item has been processed
            queue.task_done()


    async def runDecrypt(self, n, df_input):
        l_dict = {}

        queue = asyncio.Queue()
        # schedule the consumer
        consumerDe = asyncio.ensure_future(self.consumeDecr(queue))

        start_pos = 0
        end_pos = 0

        num_worker_process = n

        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            name = 'Task-' + str(i)

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            print("start_pos: ", start_pos)
            print("end_pos: ", end_pos)

            split_df = df_input.iloc[start_pos:end_pos]
            l_dict[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            # run the producer and wait for completion
            await self.produceDecr(queue, l_dict)
            # wait until the consumer has processed all items
            await queue.join()

        # the consumer is still awaiting for an item, cancel it
        consumerDe.cancel()

        return dp_fin_dict


    def getDecryptParallel(self, df_payload):

        l_dict = {}
        data_dict = {}
        min_val_list = {}
        cnt = 1
        num_worker_process = self.num_worker_process
        actual_worker_task = 0
        number_of_processes = 4

        processes = []

        split_df = p.DataFrame()
        df_ret_1 = p.DataFrame()
        dummy_df = p.DataFrame()

        # Assigning Target File Basic Name
        df_input = df_payload

        # Checking total count of rows
        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_process) + 1
        actual_worker_task = int(count_row/interval) + 1

        loop_1 = asyncio.new_event_loop()
        asyncio.set_event_loop(asyncio.new_event_loop())
        loop_2 = asyncio.get_event_loop()
        loop_2.run_until_complete(self.runDecrypt(actual_worker_task, df_input))
        loop_2.close()

        for k, v in dp_fin_dict.items():
            min_val_list[int(k.replace('Task-', ''))] = v

        min_val = min(min_val_list, key=int)
        print("Minimum Index Value: ", min_val)

        for k, v in sorted(dp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
            if int(k.replace('Task-', '')) == min_val:
                df_ret_1 = dp_fin_dict[k]
            else:
                d_frames = [df_ret_1, dp_fin_dict[k]]
                df_ret_1 = p.concat(d_frames)

        return df_ret_1

I don’t want to discuss any more look-up methods as the post is already pretty big. Only address a few critical lines

Under getEncryptParallel, the following lines are essential –

# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)

interval = int(count_row / num_worker_process) + 1
actual_worker_task = int(count_row/interval) + 1

Based on the dataframe total number of records, our application will split that main dataframe into parts of sub dataframe & then pass them using queue by asynchronous queue calls.

loop = asyncio.get_event_loop()
loop.run_until_complete(self.runEncrypt(actual_worker_task, df_input))
loop.close()

Initiating our queue methods & passing our dataframe to it.

for k, v in sorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
    if int(k.replace('Task-', '')) == min_val:
        df_ret = sp_fin_dict[k]
    else:
        d_frames = [df_ret, sp_fin_dict[k]]
        df_ret = p.concat(d_frames)

Our application is sending & receiving data using the dictionary. The reason is – we’re not expecting data that we may get it from our server in sequence. Instead, we’re hoping the data will be random. Hence, using keys, we’re maintaining our final sequence & that will ensure our application to joining back to the correct sets of source data, which won’t be the candidate for any encryption/decryption.

Let’s discuss runEncrypt method.

for i in range(actual_worker_task):
    name = 'Task-' + str(i)

    if ((start_pos + interval) < count_row):
        end_pos = start_pos + interval
    else:
        end_pos = start_pos + (count_row - start_pos)

    print("start_pos: ", start_pos)
    print("end_pos: ", end_pos)

    split_df = df_input.iloc[start_pos:end_pos]
    l_dict[name] = split_df

    if ((start_pos > count_row) | (start_pos == count_row)):
        break
    else:
        start_pos = start_pos + interval

Here, our application is splitting our source data frame into multiple sub dataframe & then it can be processed in parallel using queues.

# run the producer and wait for completion
await self.produceEncr(queue, l_dict)
# wait until the consumer has processed all items
await queue.join()

Invoking the encryption-decryption process using queues. The last line is significant. The queue will not destroy until all the item produced/place into the queue are not consumed. Hence, your main program will wait until it processes all the records of your dataframe.

Two methods named produceEncr & consumeEncr mainly used for placing an item inside the queue & then after encryption/decryption it will retrieve it from the queue.

Few important lines from both the methods are –

#produceEncr
await queue.put(m_dict)

#consumeEncr
# wait for an item from the producer
sp_fin_dict.update(await queue.get())
# Notify the queue that the item has been processed
queue.task_done()

From the first two lines, one can see that the application will place its item into the queue. Rests are the lines from the other methods. Our application is pouring the data into the dictionary, which will be returned to our calling methods. The last line is significantly essential. Without the task_done process, the queue will continue to wait for upcoming items. Hence, that will trigger infinite wait or sometimes deadlock.

10. callClient.py (This script will trigger both the serial & parallel process of encryption one by one & finally capture some statistics. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
############################################
#### Written By: SATYAKI DE             ####
#### Written On: 10-Feb-2019            ####
#### Package Flask package needs to     ####
#### install in order to run this       ####
#### script.                            ####
####                                    ####
#### Objective: This script will        ####
#### initiate the encrypt/decrypt class ####
#### based on client supplied data.     ####
############################################
import pandas as p
import clsSerial as cs
import time
import datetime
from clsParam import clsParam as cf
import clsParallel as cp
import sys

def main():
    source_df = p.DataFrame()
    encrypted_df = p.DataFrame()
    source_encrypted_df = p.DataFrame()
    decrypted_df = p.DataFrame()
    encrypted_parallel_df = p.DataFrame()
    source_encrypted_parallel_df = p.DataFrame()
    decrypted_parallel_df = p.DataFrame()

    ###############################################################################
    #####                Start Of Serial Encryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startEnTime = time.time()
    srcFile = 'acct_addr_20180106'
    srcFileWithPath = str(cf.config['SRC_DIR']) + srcFile + '.csv'

    print("Calling Serial Process to Encrypt!")

    # Reading source file
    source_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cs.clsSerial()
    encrypted_df = x.getEncrypt(source_df)

    # Handling Multiple source files
    var = datetime.datetime.now().strftime("%H.%M.%S")
    print('Target File Extension will contain the following:: ', var)

    targetFile = srcFile + '_Serial_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    encrypted_df.to_csv(taregetFileWithPath, index=False)

    endEnTime = time.time()
    z1 = str(endEnTime - startEnTime)
    print("Over All Encrypt Process Time:", z1)

    time.sleep(20)

    ###############################################################################
    #####                Start Of Serial Decryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startDeTime = time.time()
    srcFileWithPath = taregetFileWithPath

    print("Calling Serial Process to Decrypt!")

    # Reading source file
    source_encrypted_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cs.clsSerial()
    decrypted_df = x.getDecrypt(source_encrypted_df)

    targetFile = srcFile + '_restored_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    decrypted_df.to_csv(taregetFileWithPath, index=False)

    endDeTime = time.time()
    z2 = str(endDeTime - startDeTime)
    print("Over All Decrypt Process Time:", z2)

    print("-" * 157)

    ###############################################################################
    #####        End Of Serial Encryption/Decryption Methods                 ######
    ###############################################################################

    time.sleep(20)

    ###############################################################################
    #####                Start Of Parallel Encryption Methods                ######
    ###############################################################################

    print("-" * 157)

    startEnTime = time.time()
    srcFileWithPath = str(cf.config['SRC_DIR']) + srcFile + '.csv'

    print("Calling Serial Process to Encrypt!")

    # Reading source file
    source_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cp.clsParallel()
    encrypted_parallel_df = x.getEncryptParallel(source_df)

    # Handling Multiple source files
    var = datetime.datetime.now().strftime("%H.%M.%S")
    print('Target File Extension will contain the following:: ', var)

    targetFile = srcFile + '_Parallel_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    encrypted_parallel_df.to_csv(taregetFileWithPath, index=False)

    endEnTime = time.time()
    z3 = str(endEnTime - startEnTime)
    print("Over All Encrypt Process Time:", z3)

    time.sleep(20)

    ###############################################################################
    #####                Start Of Serial Decryption Methods                  ######
    ###############################################################################

    print("-" * 157)

    startDeTime = time.time()
    srcFileWithPath = taregetFileWithPath

    print("Calling Parallel Process to Decrypt!")

    # Reading source file
    source_encrypted_parallel_df = p.read_csv(srcFileWithPath, index_col=False)

    # Calling Encrypt Methods
    x = cp.clsParallel()
    decrypted_parallel_df = x.getDecryptParallel(source_encrypted_parallel_df)

    targetFile = srcFile + '_restored_'
    taregetFileWithPath = str(cf.config['FIN_DIR']) + targetFile + var + '.csv'

    # Finally Storing them into csv
    decrypted_parallel_df.to_csv(taregetFileWithPath, index=False)

    endDeTime = time.time()
    z4 = str(endDeTime - startDeTime)
    print("Over All Decrypt Process Time:", z4)

    print("-" * 157)

    ###############################################################################
    #####        End Of Parallel Encryption/Decryption Methods               ######
    ###############################################################################

    ###############################################################################
    #####    Final Statistics between Serial & Parallel loading.             ######
    ###############################################################################

    print("-" * 157)
    print("Serial Encryption:: ", z1)
    print("Serial Decryption:: ", z2)
    print("-" * 157)
    print("Parallel Encryption:: ", z3)
    print("Parallel Decryption:: ", z4)
    print("-" * 157)


if __name__ == '__main__':
    main()

As you can see, we’ve triggered both the application using the main callable scripts.

Let’s explore the output –

Windows:

Win_Files

Mac:

MAC_Files

Note that you have to open two different windows or MAC terminal. One will trigger the server & others will trigger the client to simulate this.

Server:

Win_Server

Clients:

Win:

Win_Run

MAC:

MAC_Run

So, finally, we’ve achieved our goal. So, today we’ve done a bit long but beneficial & advanced concepts of crossover stones from our python verse. 🙂

Lot more innovative posts are coming.

Till then – Happy Avenging!

Password Validation Using Regular Expression In Teradata 14 & 15

Today, we’ll be checking one new area where we can implement regular expression to achieve the password validation without involving any kind of Macro, Stored-Proc.

 

Let’s consider the following conditions to be implemented –

 

1. Password should contain characters between 6 & 10.

2. One character should be digit.

3. One character should be in upper case letter.

4. There should be at least one special character.

 

Let’s check the Query & Output –

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
select seq_no,
       passwd,
       regexp_similar(passwd,'^(?=^([[:graph:]]{6,10})$)(?=.*([[:upper:]]{1,}))(?=.*([[:digit:]]{1,})).*$') as reg_test
from scott.login_det
order by 1;


SEQ_NO	PASSWD	 REG_TEST
-----   -------  --------------
1	hoti         0
2	hotimla	     0
3	hotImla	     0
4	hot@imla     0
5	hoT@imla     0
6	hoT@iml9a    1
7	hoT@iml9a66  0

 

Similarly, you can add condition of lower case character if you want to make it more complex.

 

Hope, this will give you another way – to implement the same logic. 🙂

String Manipulation Advanced Using Teradata 14.0 Regular Expression

Today, I’ll show couple of very useful functions or logic implemented in Teradata using It’s Regular Expression.

There is two very popular demand comes from most of the Developer across different databases regarding the following two cases –

1. How to Split Comma Separated Values in each rows 

2. How to bind separate values in 1 row (Just opposite of Step 1)

2nd Options are very demanding as Cross platform database professional specially Oracle Developers looking for these kind of implementation as Oracle has directly built-in functions to do the same. Those functions are Listagg, wm_concat, group_concat.

Let’s check the solution –

Case 1,

Let’s create the table & prepare some data –

 

1
2
3
4
5
6
7
CREATE MULTISET TABLE ETL_DATA.PARSE_STR
  (
     SEQ_NO       INTEGER,
     SRC_STR     VARCHAR(70)
  );
 
CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:01.864

 

Let’s insert some data –

 

1
2
3
INSERT INTO ETL_DATA.PARSE_STR VALUES(1,'RAM,TRIDIB,ANUPAM,BIRESWAR,SUJAY')
;INSERT INTO ETL_DATA.PARSE_STR VALUES(2,'TUNKAI,SAYAN,BABU,PAPU')
;INSERT INTO ETL_DATA.PARSE_STR VALUES(3,'IK,ATBIS,SAPMUNDA');

 

Let’s check the value –

 

1
2
3
4
5
SEQ_NO          SRC_STR
------  ----------------------------------
    1   RAM,TRIDIB,ANUPAM,BIRESWAR,SUJAY
    2   TUNKAI,SAYAN,BABU,PAPU
    3   IK,ATBIS,SAPMUNDA

 

Fine, Now our objective will be split these comma separated values in each lines.

 

1
2
3
4
5
6
SELECT b.SEQ_NO,
       regexp_substr(b.SRC_STR,'[^,]+',1,day_of_calendar) AS SRC_STR
FROM sys_calendar.calendar ,
     PARSE_STR b
WHERE day_of_calendar BETWEEN 1 AND  (LENGTH(b.SRC_STR) - LENGTH(regexp_replace(b.SRC_STR,'[^A-Z]+','',1,0,'i'))+1 )
ORDER BY 1,2;

 

And, let’s check the output –

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SEQ_NO  SRC_STR
-----   ----------------------
1       ANUPAM
1       BIRESWAR
1       RAM
1       SUJAY
1       TRIDIB
2       BABU
2       PAPU
2       SAYAN
2       TUNKAI
3       ATBIS
3       IK
3       SAPMUNDA

 

Gr8! I guess, result is coming as per my expectation. 🙂

 

Case 2(Subsitute Of Listagg, wm_concat, group_concat in Oracle),

This we’ve to do it in Two small Steps for better understanding & performance.

First, let us create another table –

 

1
2
3
4
5
6
7
CREATE MULTISET TABLE ETL_DATA.WM_CONCAT_TAB
   (
      SEQ_NO   INTEGER,
      SRC_STR VARCHAR(20)
   );
    
CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:01.230

 

Good. Now we’ll populate some data into this table. We’ll populate data from Step 1 as this will provide the exact data that we’re expecting as input test data for Case 2.

Let’s insert those data –

 

1
2
3
4
5
6
INSERT INTO ETL_DATA.WM_CONCAT_TAB
SELECT b.SEQ_NO,
       regexp_substr(b.SRC_STR,'[^,]+',1,day_of_calendar) AS SRC_STR
FROM sys_calendar.calendar ,
     PARSE_STR b
WHERE day_of_calendar BETWEEN 1 AND  (LENGTH(b.SRC_STR) - LENGTH(regexp_replace(b.SRC_STR,'[^A-Z]+','',1,0,'i'))+1 );

 

Let’s check the data –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
SEQ_NO  SRC_STR
------  --------------------
1       ANUPAM
1       BIRESWAR
1       RAM
1       SUJAY
1       TRIDIB
2       BABU
2       PAPU
2       SAYAN
2       TUNKAI
3       ATBIS
3       IK
3       SAPMUNDA

 

As you know in TD we’ve significant restcriction regarding Hirarchical Queries & Recursive Queries. So, In this step we’ll build one relationship like employee & manager in popular employee table. So, if we have that kind of relation then we can easily establish & fit that in TD model.

Let’s create this intermediate table. In this case we’ll go for mapping between current rows with next rows. This is also very useful process. In Oracle, they have LEAD or LAG functions to achieve the same. But, here we’ve to work a little bit more to achive the same.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
CREATE MULTISET VOLATILE TABLE VT_SRC_ARRNG
AS
     (
            SELECT SEQ_NO,
                   SRC_STR,
                   MAX(SRC_STR) OVER(
                                        PARTITION BY SEQ_NO
                                        ORDER BY SEQ_NO, SRC_STR
                                        ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING 
                                    ) AS PREV_SRC_STR,
                   COUNT(*)  OVER(
                                    PARTITION BY SEQ_NO
                                 ) AS MAX_RECUR_CNT
            FROM WM_CONCAT_TAB
      )
WITH DATA
ON COMMIT
PRESERVE ROWS;
 
CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:01.102

 

Let’s look the output –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
SELECT *
FROM VT_SRC_ARRNG
ORDER BY 1,2;
 
 
 
 
SEQ_NO  SRC_STR  PREV_SRC_STR    MAX_RECUR_CNT
-----   -------  --------------- ---------------------
1       ANUPAM      BIRESWAR     5
1       BIRESWAR    RAM          5
1       RAM         SUJAY        5
1       SUJAY       TRIDIB       5
1       TRIDIB      ?            5
2       BABU        PAPU         4
2       PAPU        SAYAN        4
2       SAYAN       TUNKAI       4
2       TUNKAI      ?            4
3       ATBIS       IK           3
3       IK          SAPMUNDA     3
3       SAPMUNDA    ?            3

 

Fine. From the above VT we can see every Source String has one Previous Source String. Also, we’ve noted down that in each window of SEQ_NO how many levels are there by MAX_RECUR_CNT. We’ll use this column later.

Let’s move to the 2nd & final part –

Let’s aggregate the values based on SEQ_NO & club them with comma –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH RECURSIVE WM_CONCAT(SEQ_NO, SRC_STR, PREV_SRC_STR, MAX_RECUR_CNT, LVL,  COMMA_SEP_STR)
AS
     (
        SELECT SEQ_NO,
               SRC_STR,
               PREV_SRC_STR,
               MAX_RECUR_CNT,
               1 AS LVL,
               CAST( '' AS VARCHAR(100)) AS COMMA_SEP_STR
       FROM VT_SRC_ARRNG
       WHERE  PREV_SRC_STR IS NULL
       UNION ALL
       SELECT  b.SEQ_NO,
               b.SRC_STR,
               b.PREV_SRC_STR,
               b.MAX_RECUR_CNT,
               c.LVL+1 AS LVL,
               c.COMMA_SEP_STR||b.SRC_STR||',' AS COMMA_SEP_STR
       FROM VT_SRC_ARRNG b,
               WM_CONCAT c
       WHERE c.SRC_STR =  b.PREV_SRC_STR
     )
SELECT k.SEQ_NO,
       k.AGGR_STR
FROM (               
    SELECT SEQ_NO,
           SRC_STR,
           LVL,
           MAX_RECUR_CNT,
           MIN(CASE
                 WHEN LVL = 1 THEN
                    SRC_STR
               ELSE
                  'ZZZZZ'
               END   ) OVER(
                                 PARTITION BY SEQ_NO
                                 ORDER BY LVL ASC
                           ) ROOT_SRC_STR,
           COMMA_SEP_STR||ROOT_SRC_STR AS AGGR_STR
    FROM WM_CONCAT
    )  k
WHERE k.LVL = k.MAX_RECUR_CNT
ORDER BY 1,2;

 

Let’s check the output –

1
2
3
4
5
SEQ_NO  AGGR_STR
------- ---------------------------
1       SUJAY,RAM,BIRESWAR,ANUPAM,TRIDIB
2       SAYAN,PAPU,BABU,TUNKAI
3       IK,ATBIS,SAPMUNDA

 

I guess, We’ve done it. 😀

So, You can achieve the same without writing any UDF.

 

Performance of Regular Expression in Teradata 14.0

Today I’ll explain about the performance impact of these Regular expressions in Teradata.

It is believed that these functions have newly introduced. Hence, it may possible that these function may take some time to settle or in other words we may expect to see some patches before they can be considered as stable & ready to use in TD.

Before, we can go through this – we must understood about these functions & where we should use them properly. It is quite obvious that we would like to use them in such places where using teradata’s old stable function cannot achieve using a single SQL or we are looking for some kind of Stored-Proc in order to implement this business logic. Hence, it would be unfair to simply compare a simple solution with this. Rather, we should consider those complex parsing logic & the total performance by those Stored-Proc or any relevant process with these functions. In those cases – Regular expression will be very handy – I believe.

Let’s consider one simple case –

Let’s consider the following string – “SANTA’S JOYFULL GIFT“.

I want to fetch the a part of the string till it encounters first space character i.e. it will provide the following output as per the business logic – “SANTA’S“.

I’ll test that with significant volume of data & would like to compare the explain plan between the normal process & regular expression.

Let’s check the explain plan for the SQL that uses conventional functions –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
EXPLAIN SELECT C_KEY,
        C_CD,
        S_ORG_NM,
        SUBSTR(S_ORG_NM,1,POSITION(' ' IN S_ORG_NM||' ')) AS DER_S_ORG_NM
FROM MASTER_CLAIM
WHERE C_CD = '555';

  1) First, we lock EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM for
     access.
  2) Next, we do an all-AMPs RETRIEVE step from EDW_CORE_DB.MASTER_CLAIM in view
     ETL_VIEWS.MASTER_CLAIM by way of an all-rows scan with a condition
     of ("EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM.C_CD = '555 '")
     into Spool 1 (group_amps), which is built locally on the AMPs.
     The input table will not be cached in memory, but it is eligible
     for synchronized scanning.  The size of Spool 1 is estimated with
     high confidence to be 38,212,793 rows (5,082,301,469 bytes).  The
     estimated time for this step is 40.02 seconds.
  3) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 40.02 seconds.

 

Now, let’s try the same with the Regular expression –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
EXPLAIN SELECT C_KEY,
        C_CD,
        S_ORG_NM,
        regexp_substr(S_ORG_NM,'[^ ]+') AS DER_S_ORG_NM
FROM MASTER_CLAIM
WHERE C_CD = '555';

  1) First, we lock EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM for
     access.
  2) Next, we do an all-AMPs RETRIEVE step from EDW_CORE_DB.MASTER_CLAIM in view
     ETL_VIEWS.MASTER_CLAIM by way of an all-rows scan with a condition
     of ("EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM.C_CD = '555 '")
     into Spool 1 (group_amps), which is built locally on the AMPs.
     The input table will not be cached in memory, but it is eligible
     for synchronized scanning.  The size of Spool 1 is estimated with
     high confidence to be 38,212,793 rows (105,696,585,438 bytes).
     The estimated time for this step is 40.02 seconds.
  3) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 40.02 seconds.

 

So, from the above two – we really can’t find much difference in plan except the number of bytes that transfers. But, in both the cases the estimated time shows 40.02 seconds only.

So, now we can check what will be the actual time it will take. Let’s see that also.

First, let us create one Virtual Table & try to record the total create time –

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE MULTISET VOLATILE TABLE VT1
AS
    (
         SELECT C_KEY,
				        C_CD,
				        S_ORG_NM,
				        SUBSTR(S_ORG_NM,1,POSITION(' ' IN S_ORG_NM||' ')) AS DER_S_ORG_NM
		 FROM MASTER_CLAIM
		 WHERE C_CD = '555'
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

 

And, the response is as follows –

1
--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:05.076

 

Let’s create another VT with the new approach –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE MULTISET VOLATILE TABLE VT2
AS
    (
         SELECT C_KEY,
				        C_CD,
				        S_ORG_NM,
				        regexp_substr(S_ORG_NM,'[^ ]+') AS DER_S_ORG_NM
		 FROM MASTER_CLAIM
		 WHERE C_CD = '555'
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

 

And, the response time –

1
--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:05.762

 

So, as you can see there is not much difference between the old process & new process.

And, the total number of records we have test this –

 

1
2
3
4
SELECT COUNT(*)
FROM VT1;

40,781,904

 

So, from the above you can see that we’ve tested this on significant number of rows, which is very common in any TD system.

Let’s test whether both the SQLs actually returning same value. To do that – we’ll create one more VT are as follows –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE MULTISET VOLATILE TABLE VT3
AS
    (
         SELECT a.C_KEY,
				        a.C_CD,
				        a.S_ORG_NM,
				        a.DER_S_ORG_NM AS OLD_PRCHSR_ORG_NM,
				        b.DER_S_ORG_NM AS NEW_PRCHSR_ORG_NM,
				        CHAR_LENGTH(a.DER_S_ORG_NM) AS OLD_PRCHSR_ORG_NM_LEN,
				        CHAR_LENGTH(b.DER_S_ORG_NM) AS NEW_PRCHSR_ORG_NM_LEN
		 FROM VT1 a,
		             VT2 b
		 WHERE a.C_KEY = b.C_KEY
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:06.864

 

Now, lets test the output –

 

1
2
3
4
5
SELECT *
FROM VT3
WHERE OLD_PRCHSR_ORG_NM <> NEW_PRCHSR_ORG_NM;

--SELECT completed. 0 rows returned. Elapsed Time =  00:00:01.763

 

So, as you can see that from the above simulation – we can establish that the performance between the conventional SQL & SQL using Regular expression are negligible.

But, again I must clearly say – Regular expression will be ideal where we need multiple SQLs or PL/SQL to implement. Or, the place where you need to implement one complex parsing that is difficult to implement in a SQL.

Hope this will give you some clarity. 😀

Regular Expression on Teradata 14.0

I’ve been working for more than 8 years in Oracle 10g, 11g & worked significant queries on Regular expressions in various scenario using SQL. It is real handy if you know how to use it & can reduce lots of pain with single SQL. And, the performance will be better compared to the total effort to achieve the same functionalists by using multiple SQL queries or PL/SQL Procedures.

Last couple of years, I’m working on Teradata. And, on some occasion – I was expecting features like these, where I can easily manipulate data with regular expression. I’m pretty excited when I heard that Teradata also introduced Regular Expression from Version 14.0.


As a result, I tried all those features that I think can be handy & useful for various scenarios & followings are the successful queries that I get. There are two occasion, where Teradata partially able to manipulate those strings. I’ve checked the latest Teradata Manual. However, unable to find those solution. So, I’m expecting other forum members can contribute here in order to make this thread useful for every one of us. And, I’ll post here as soon as I get some answers on these partial conversions.

For better understanding, I’ve provided the actual column value & after transformation value of that column in the output. That will help us to grasp it easily – I guess. 🙂


Case 1,

1
2
3
4
5
SELECT regexp_replace('SatyakiDe','([[:lower:]]{1,})([[:upper:]]{1,})','\1 \2') AS COL_VAL;

COLA COL_VAL
---------------- ----------------------------------------
SatyakiDe Satyaki De


Case 2,

1
2
3
4
5
select regexp_replace('919047242526','^([[:digit:]]{2})([[:digit:]]{10})','+\1 \2') COL_VAL;

COLA COL_VAL
------------ ---------------
919047255555 +91 9047255555



Case 3,

1
2
3
4
5
select regexp_replace('+++C','^([[:punct:]]{2})([[:punct:]]{1})(.*)$','\1\3') COL_VAL;

COLA COL_VAL
---- -----
+++C ++C



Case 4,

1
2
3
4
5
select initcap(regexp_replace(regexp_substr(' satyaki.de@mail.com','[^@]+'),'(.*)(\.)(.*)','\1 \3')) COL_VAL;

COLA COL_VAL
-------------------------------- --------------------------------------------------
satyaki.de@mail.com Satyaki De



Case 5,

1
2
3
4
5
select regexp_replace('100011001','([[:digit:]]{3})([[:digit:]]{2})([[:digit:]]{4})','XXX-XX-\3') as COL_VAL;

COLA COL_VAL
---------------- --------------------
100011001 XXX-XX-1001



Case 6,

1
2
3
4
5
select regexp_replace('123456789','([[:digit:]]{3})([[:digit:]]{3})([[:digit:]]{3})','\3.\2.\1') as COL_VAL;

COLA COL_VAL
--------- ---------------
123456789 789.456.123



Case 7,

1
2
3
4
5
SELECT regexp_replace('satyaki9de0loves3to8work2on2sql0and2bi6tools1','[^0-9]+','',1,0,'i') AS DER_VAL;

COLA DER_VAL
--------------------------------------------- ----------
satyaki1de0loves3to8work2on2sql0and2bi4tools1 1038220241




As you can see, all the characters have filtered out from the string & only numbers are kept here. These sorts of queries are very useful in lots of different business scenarios as well.

So, any extra space may not produce desired result. And, needs to pay attention into these small details. 

And, I’ve tested all these queries in the following two versions –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.00.02
2 RELEASE 14.10.00.02
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard



Hope, this will give you much more clarity. 🙂

One more thing, I would like to clarify here – my intention is to describe more features about these regexp_(similar/substr/instr/replace) functions.

I’ve received one question whether these regexp functions available in TD 13 or not in Teradata forum while posting the same article over there.

And, here is my answer to that question –  

Regarding version 13,

Let us check whether they have these regexp functions or not –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.00.00.15
2 RELEASE 13.00.00.15
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.10.07.12
2 RELEASE 13.10.07.12
3 LANGUAGE SUPPORT MODE Standard


1
2
3
4
5
6
7
8
9
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;
$
*** Failure 3706 Syntax error: expected something between '(' and the string 'S' keyword.
Statement# 1, Info =35
*** Total elapsed time was 1 second.



Hope this will give adequate clarity to the answer of that above question.

Now, Lets see some other functionality.

REGEXP_SIMILAR has similar functionality like REGEXP_LIKE in Oracle.

Let’s see couple of such cases –

Lets prepare the table with some dummy data –


 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SELECT * FROM dbc.dbcinfo;

InfoKey InfoData
-------- -----------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard


CREATE MULTISET VOLATILE TABLE TEST_T1
(
COL1 VARCHAR(10)
)
ON COMMIT
PRESERVE ROWS;

INSERT INTO TEST_T1 VALUES('456')
;INSERT INTO TEST_T1 VALUES('123x')
;INSERT INTO TEST_T1 VALUES('x123')
;INSERT INTO TEST_T1 VALUES('y')
;INSERT INTO TEST_T1 VALUES('+789')
;INSERT INTO TEST_T1 VALUES('-789')
;INSERT INTO TEST_T1 VALUES('159-')
;INSERT INTO TEST_T1 VALUES('-1-');


Lets check the data now –

 1
2
3
4
5
6
7
8
9
10
11
12
SELECT *
FROM TEST_T1;

COL1
1 123x
2 456
3 x123
4 +789
5 -789
6 y
7 159-
8 -1-



Let’s look into the various scenarios now –


Case 1 (Returns Mixed Numbers, Signed Numbers & Non Numbers),

 1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=0;

COL1
-----
1 123x
2 x123
3 +789
4 -789
5 y
6 159-
7 -1-




Case 2 (Returns Only Unsigned Positive Numbers),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=1;

COL1
-----
456



Case 3 (Returns All Numbers including Positive, Negative & unsigned),

 1
2
3
4
5
6
7
8
9
10
11
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[+-]?[0-9]+[+-]?$','c')=1;

COL1
-----
456
+789
-789
159-
-1-



Case 4 (Returns Only Non Numbers i.e. Characters),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'[^0-9]+','c')=1;

COL1
----
y



Hope this will give you some additional idea. 🙂

My objective is to provide basic information to my friends. So, that they can write better SQL in TD while migrating from other popular databases or new developer in TD can get a flavor of this powerful feature & exploit them in all the positive aspect & apply them properly. 😀

Really appreciate your time to read this post.

Regards.

Satyaki De.











Regular Expression In Oracle

From 10g in Oracle there is a significant power is given to all new Oracle Programmer and that is inherited from POSIX. Yes, you guess right – it is regular expression. One of the most powerful features that is missing for quite some time and programmer no need to write big and complex functions or procedures to produce their required result.

In this post i’m not going to discuss about the syntax and all the minute details which require to use it. Rather, i’ll concentrate on some useful snippet on regular expression which might be helpful for you in your case or may require little enhancement from your part.

As i follow OTN, and find these requirements are frequently needed by many programmer. So, why not compiled them in a single platform. If you want to know the basic syntax of regular expression then you can visit this place.

One thing i would like to tell to produce the formatted output – i use CAST function here. Otherwise, it is not required in most of the case where i’ve used in this post. Hope you don’t get confused for this.

Splitting comma-separated values:

scott@ORCL>select * from v$version;

BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
PL/SQL Release 11.1.0.6.0 - Production
CORE 11.1.0.6.0 Production
TNS for 32-bit Windows: Version 11.1.0.6.0 - Production
NLSRTL Version 11.1.0.6.0 - Production

Elapsed: 00:00:00.00
scott@ORCL>
scott@ORCL>
scott@ORCL>with tt
2 as
3 (
4 select 1 id, 'saty,anup,john,sas' src from dual
5 union all
6 select 2, 'shank,rajib,papu,sanjit,amit' from dual
7 )
8 select k.*
9 from (
10 select distinct id,
11 regexp_substr(src,'[^,]+',1,level) cooked_src
12 from tt
13 connect by level <= (length(src) - length(replace(src,',',''))) + 1
14 ) k
15 order by k.id,k.cooked_src;

ID COOKED_SRC
---------- ----------------------------
1 anup
1 john
1 sas
1 saty
2 amit
2 papu
2 rajib
2 sanjit
2 shank

9 rows selected.

Elapsed: 00:00:00.38
scott@ORCL>
scott@ORCL>

Insert a space inside a joined word:

scott@ORCL>with res
2 as
3 (
4 select 'SatyakiDe' cola from dual
5 union all
6 select 'RajibChakroborty' from dual
7 union all
8 select 'PranabPaul' from dual
9 )
10 select cola,
11 cast(regexp_replace(cola,'^(.*)([[:upper:]]{1,})(.*)$','\1 \2\3') as varchar2(40)) cooked_cola
12 from res;

COLA COOKED_COLA
---------------- ----------------------------------------
SatyakiDe Satyaki De
RajibChakroborty Rajib Chakroborty
PranabPaul Pranab Paul

Elapsed: 00:00:00.04
scott@ORCL>
 

Insert a space inside a joined word:

scott@ORCL>
scott@ORCL>with fer
2 as
3 (
4 select '919047242526' cola from dual
5 union all
6 select '919038220261' from dual
7 )
8 select cola,
9 cast(regexp_replace(cola,'^([[:digit:]]{2})([[:digit:]]{10})','+\1 \2') as varchar2(15)) cooked_cola
10 from fer;

COLA COOKED_COLA
------------ ---------------
919047242526 +91 9047242526
919038220261 +91 9038220261

Elapsed: 00:00:00.03
scott@ORCL>

Remove one + from the source:

scott@ORCL>with hig
2 as
3 (
4 select '+++C' cola from dual
5 )
6 select cola,
7 cast(regexp_replace(cola,'^([[:punct:]]{2})([[:punct:]]{1})(.*)$','\1\3') as varchar2(5)) cooked_cola
8 from hig;

COLA COOKE
---- -----
+++C ++C

Elapsed: 00:00:00.02
scott@ORCL>

Extracting number from string:

scott@ORCL>with tep
2 as
3 (
4 select 'satyaki9de0is3a8studious2and2energetic0software2engineer6here1' cola from dual
5 )
6 select cola,
7 cast(regexp_replace(cola,'[^0-9]','') as varchar2(12)) cooked_cola
8 from tep;

COLA COOKED_COLA
-------------------------------------------------------------- ------------
satyaki9de0is3a8studious2and2energetic0software2engineer6here1 9038220261

Elapsed: 00:00:00.03
scott@ORCL>

Extracting names from mails:

scott@ORCL>
scott@ORCL>with reg
2 as
3 (
4 select 'satyaki.de@gmail.com' cola from dual
5 union all
6 select 'pranab.paul@aol.in' from dual
7 union all
8 select 'tuhin.chakroborty@rediffmail.com' from dual
9 union all
10 select 'debraj.saha@yahoo.com' from dual
11 )
12 select cola,
13 cast(
14 initcap(regexp_replace(regexp_substr(cola,'[^@]+'),'(.*)(\.)(.*)','\1 \3')) as varchar2(50)
15 ) cooked_cola
16 from reg;

COLA COOKED_COLA
-------------------------------- --------------------------------------------------
satyaki.de@gmail.com Satyaki De
pranab.paul@aol.in Pranab Paul
tuhin.chakroborty@rediffmail.com Tuhin Chakroborty
debraj.saha@yahoo.com Debraj Saha

Elapsed: 00:00:00.03
scott@ORCL>

Insert spaces between small & caps letter:

scott@ORCL>
scott@ORCL>with kit
2 as
3 (
4 select 'AbraCaDabra' cola from dual
5 )
6 select cola,
7 cast(
8 regexp_replace(cola,'([[:lower:]])([[:upper:]])','\1 \2') as varchar2(20)
9 ) cooked_cola
10 from kit;

COLA COOKED_COLA
----------- --------------------
AbraCaDabra Abra Ca Dabra

Elapsed: 00:00:00.02
scott@ORCL>
 

Masking credit card information:

scott@ORCL>with jol
2 as
3 (
4 select '4567098723560984' cola from dual
5 )
6 select cola,
7 cast(
8 regexp_replace(cola,'([[:digit:]]{4})([[:digit:]]{4})([[:digit:]]{4})([[:digit:]]{4})','\1-XXXX-XXXX-\4')
9 as varchar2(20)
10 ) cooked_cola
11 from jol;

COLA COOKED_COLA
---------------- --------------------
4567098723560984 4567-XXXX-XXXX-0984

Elapsed: 00:00:00.02
scott@ORCL>

Convert a number series to ip address & also print this in reverse order:

scott@ORCL>
scott@ORCL>with rev
2 as
3 (
4 select '123456789' cola from dual
5 )
6 select cola,
7 cast(
8 regexp_replace(cola,'([[:digit:]]{3})([[:digit:]]{3})([[:digit:]]{3})','\3.\2.\1')
9 as varchar2(15)
10 ) cooked_cola
11 from rev;

COLA COOKED_COLA
--------- ---------------
123456789 789.456.123

Elapsed: 00:00:00.02
scott@ORCL>

Hope you like this.

Very soon i’m going to post another useful snippet on some other topic. Till then best of luck and keep following this blog.