Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(6, 8)
total_2 = total_2 + random.randint(5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2190, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x 260
center_y: self.parent.center_y 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/255, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x 250
center_y: self.parent.center_y 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x 240
center_y: self.parent.center_y 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Creating a real-time dashboard from streaming data using Python

Hi Guys,

Today, I’ll demonstrate one of the fascinating ways to capture real-time streaming data in a dashboard. It is a dream for any developer who wants to build an application involving streaming data, API & a dashboard.

Why don’t we see our run to make this thread more interesting?

Real-Time Dashboard using streaming data

Today, I’ll be using the two most essential services to achieve that goal.

Ably

H2O-Wave

Let’s discuss brief about these two services.

  • Why I used “Ably” here?

One of my scenarios is to consume real-time currency data. Even after checking paid-API, I was not getting what I was looking for. Hence, I decided to use any service, which can mimics & publish my data as streaming data through a channel. Once published, I’ll consume the posted data into my application to create this new dashboard.

Using Ably, you can leverage their cloud platform to publish & consume data with the free developer account, which is sufficient for anyone.

To better understand this, we need to understand the basic concept of “pubsub”. Here is the important page from their side that I would like to embed for your reference –

Source: Ably

To know more about this, please refer to the following link.

  • Why I used “H2O-Wave” here?

Wave_H2O is a relatively brand new framework with some outstanding capabilities to visualize your data using native Python.

  • Pre-Steps:

We need to register Ably. Some of the useful screen that we should explore more –

API-Key Page

Successful creation of an App will generate the API-Key. Make sure that you note-down the channel details as well.

Quota Limit

The above page will capture the details of usage. Since this is a free subscription, you will be blocked once you consume your limit. However, for paid users, this is one of the vital pages to control their budget.

Message Published & Consumption Visuals

Like any other cloud service, you can check your message published or consumptions here on this page.

This is the main landing page for H2O-Wave –

H2O Wave

They have a quite many example snippet. However, these samples contain random data. Hence, these are relatively easier to implement. It would take quite some effort to tailor it for your need to implement that for real-life scenarios.

Some of the important links are as follows –

  1. H2O-Wave Tour
  2. GitHub

You need to install the following libraries in Python –

pip install ably
pip install h2o-wave

We’ve two scripts. We’re not going to discuss the publish streaming data script over here. We’ll be discussing only the consumption script, which will generate the dashboard as well. If you need, you can post your message. I’ll provide it.

1. dashboard_st.py ( This native Python script will consume streaming data & create live dashboard. )

##########################################################
#### Template Written By: H2O Wave                    ####
#### Enhanced with Streaming Data By: Satyaki De      ####
#### Base Version Enhancement On: 20-Dec-2020         ####
#### Modified On 26-Dec-2020                          ####
####                                                  ####
#### Objective: This script will consume real-time    ####
#### streaming data coming out from a hosted API      ####
#### sources using another popular third-party        ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for     ####
#### any start-ups.                                   ####
##########################################################

import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json

class DaSeries:
    def __init__(self, inputDf):
        self.Df = inputDf
        self.count_row = inputDf.shape[0]
        self.start_pos = 0
        self.end_pos = 0
        self.interval = 1


    def next(self):
        try:
            # Getting Individual Element & convert them to Series
            if ((self.start_pos + self.interval) <= self.count_row):
                self.end_pos = self.start_pos + self.interval
            else:
                self.end_pos = self.start_pos + (self.count_row - self.start_pos)

            split_df = self.Df.iloc[self.start_pos:self.end_pos]

            if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
                pass
            else:
                self.start_pos = self.start_pos + self.interval

            x = float(split_df.iloc[0]['CurrentExchange'])
            dx = float(split_df.iloc[0]['Change'])

            # Emptying the exisitng dataframe
            split_df = p.DataFrame(None)

            return x, dx
        except:
            x = 0
            dx = 0

            return x, dx

class CategoricalSeries:
    def __init__(self, sourceDf):
        self.series = DaSeries(sourceDf)
        self.i = 0

    def next(self):
        x, dx = self.series.next()
        self.i += 1
        return f'C{self.i}', x, dx


light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()

_color_index = -1
colors = dark_theme_colors

def next_color():
    global _color_index
    _color_index += 1
    return colors[_color_index % len(colors)]


_curve_index = -1
curves = 'linear smooth step stepAfter stepBefore'.split()


def next_curve():
    global _curve_index
    _curve_index += 1
    return curves[_curve_index % len(curves)]


def create_dashboard(update_freq=0.0):
    page = site['/dashboard_st']

    # Fetching the data
    client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

    # Counter Value
    cnt = 0

    # Declaring Global Data-Frame
    df_conv = p.DataFrame()

    for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

    # Resetting the Index Value
    df_conv.reset_index(drop=True, inplace=True)

    print('DF:')
    print(df_conv)

    df_conv['default_rank'] = df_conv.groupby(['Currency']).cumcount() + 1
    lkp_rank = 1
    df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]

    print('Rank DF Unique:')
    print(df_unique)

    count_row = df_unique.shape[0]

    large_lines = []
    start_pos = 0
    end_pos = 0
    interval = 1

    # Converting dataframe to a desired Series
    f = CategoricalSeries(df_conv)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unique.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Currency'])

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

create_dashboard(update_freq=0.25)

Some of the key snippets from the above codes are –

class DaSeries:
    def __init__(self, inputDf):
        self.Df = inputDf
        self.count_row = inputDf.shape[0]
        self.start_pos = 0
        self.end_pos = 0
        self.interval = 1


    def next(self):
        try:
            # Getting Individual Element & convert them to Series
            if ((self.start_pos + self.interval) <= self.count_row):
                self.end_pos = self.start_pos + self.interval
            else:
                self.end_pos = self.start_pos + (self.count_row - self.start_pos)

            split_df = self.Df.iloc[self.start_pos:self.end_pos]

            if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
                pass
            else:
                self.start_pos = self.start_pos + self.interval

            x = float(split_df.iloc[0]['CurrentExchange'])
            dx = float(split_df.iloc[0]['Change'])

            # Emptying the exisitng dataframe
            split_df = p.DataFrame(None)

            return x, dx
        except:
            x = 0
            dx = 0

            return x, dx

class CategoricalSeries:
    def __init__(self, sourceDf):
        self.series = DaSeries(sourceDf)
        self.i = 0

    def next(self):
        x, dx = self.series.next()
        self.i += 1
        return f'C{self.i}', x, dx

The above snippet will create a series of data out of a pandas data frame. It will consume, one-by-one record & then pass it to the dashboard for real-time updates.

# Fetching the data
client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
channel = client.channels.get('sd_channel')

message_page = channel.history()

In the above code, the application will consume the real-time data out of Ably’s channel.

df_conv['default_rank'] = df_conv.groupby(['Currency']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]

In the above code, the application is uniquely identifying the first instance of currency entries, which will be passed to the initial dashboard page before consuming the array of updates.

f = CategoricalSeries(df_conv)

In the above code, the application is creating an instance of the intended categorical series.

c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

The above code is a standard way to bind the streaming data with the H2O-Wave dashboard.

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

Here are the last few snippet lines that will capture the continuous streaming data & keep updating the numbers on your dashboard.

Since I’ve already provided the run video of my application, here are a few important screens –

Case 1:

Wave Server Start Command

Case 2:

Publishing stream data

Case 3:

Consuming Stream Data & Publishing to Dashboard

Case 4:

Dashboard Data

So, finally, we have done it.

You will get the complete codebase in the following Github link.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Predicting health issues for Senior Citizens based on “Realtime Weather Data” in Python

Hi Guys,

Today, I’ll be presenting a different kind of post here. I’ll be trying to predict health issues for senior citizens based on “realtime weather data” by blending open-source population data using some mock risk factor calculation. At the end of the post, I’ll be plotting these numbers into some graphs for better understanding.

Let’s drive!

For this first, we need realtime weather data. To do that, we need to subscribe to the data from OpenWeather API. For that, you have to register as a developer & you’ll receive a similar email from them once they have approved –

1. Subscription To Open Weather

So, from the above picture, you can see that, you’ll be provided one API key & also offered a couple of useful API documentation. I would recommend exploring all the links before you try to use it.

You can also view your API key once you logged into their console. You can also create multiple API keys & the screen should look something like this –

2. Viewing Keys For security reasons, I’ll be hiding my own keys & the same should be applicable for you as well.

I would say many of these free APIs might have some issues. So, I would recommend you to start testing the open API through postman before you jump into the Python development. Here is the glimpse of my test through the postman –

3. Testing API

Once, I can see that the API is returning the result. I can work on it.

Apart from that, one needs to understand that these API might have limited use & also you need to know the consequences in terms of price & tier in case if you exceeded the limit. Here is the detail for this API –

5. Package Details - API

For our demo, I’ll be using the Free tire only.

Let’s look into our other source data. We got the top 10 city population-wise over there internet. Also, we have collected sample Senior Citizen percentage against sex ratio across those cities. We have masked these values on top of that as this is just for education purposes.

1. CityDetails.csv

Here is the glimpse of this file –

4. Source File

So, this file only contains the total population across the top 10 cities in the USA.

2. SeniorCitizen.csv

6. SeniorCitizen Data

This file contains the Sex ratio of Senior citizens across those top 10 cities by population.

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "http://api.openweathermap.org/data/2.5/weather",
        'API_HOST': "api.openweathermap.org",
        'API_KEY': "XXXXXXXXXXXXXXXXXXXXXX",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Open Weather Forecast',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SRC_FILE': Curr_Path + sep + 'Src_File' + sep + 'CityDetails.csv',
        'SRC_FILE_1': Curr_Path + sep + 'Src_File' + sep + 'SeniorCitizen.csv',
        'SRC_FILE_INIT': 'CityDetails.csv',
        'COL_LIST': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'name', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'weather', 'deg', 'gust', 'speed'],
        'COL_LIST_1': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'CityName', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription'],
        'COL_LIST_2': ['CityName', 'Population', 'State']
    }

2. clsWeather.py (This script contains the main logic to extract the realtime data from our subscribed weather API.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsWeather:
    def __init__(self):
        self.url = cf.config['URL']
        self.openmapapi_host = cf.config['API_HOST']
        self.openmapapi_key = cf.config['API_KEY']
        self.openmapapi_cache = cf.config['CACHE']
        self.openmapapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            openmapapi_host = self.openmapapi_host
            openmapapi_key = self.openmapapi_key
            openmapapi_cache = self.openmapapi_cache
            openmapapi_con = self.openmapapi_con
            type = self.type

            querystring = {"appid": openmapapi_key, "q": rawQry}

            print('Input JSON: ', str(querystring))

            headers = {
                'host': openmapapi_host,
                'content-type': type,
                'Cache-Control': openmapapi_cache,
                'Connection': openmapapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

The key lines from this script –

querystring = {"appid": openmapapi_key, "q": rawQry}

print('Input JSON: ', str(querystring))

headers = {
    'host': openmapapi_host,
    'content-type': type,
    'Cache-Control': openmapapi_cache,
    'Connection': openmapapi_con
}

response = requests.request("GET", url, headers=headers, params=querystring)

ResJson  = response.text

In the above snippet, our application first preparing the payload & the parameters received from our param script. And then invoke the GET method to extract the real-time data in the form of JSON & finally sending the JSON payload to the primary calling function.

3. clsMap.py (This script contains the main logic to prepare the MAP using seaborn package & try to plot our custom made risk factor by blending the realtime data with our statistical data received over the internet.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### plot into the Map.                   ####
##############################################

import seaborn as sns
import logging
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl

# This library requires later
# to print the chart
import matplotlib.pyplot as plt

class clsMap:
    def __init__(self):
        self.src_file =  cf.config['SRC_FILE_1']

    def calculateRisk(self, row):
        try:
            # Let's assume some logic
            # 1. By default, 30% of Senior Citizen
            # prone to health Issue for each City
            # 2. Male Senior Citizen is 19% more prone
            # to illness than female.
            # 3. If humidity more than 70% or less
            # than 40% are 22% main cause of illness
            # 4. If feels like more than 280 or
            # less than 260 degree are 17% more prone
            # to illness.
            # Finally, this will be calculated per 1K
            # people around 10 blocks

            str_sex = str(row['Sex'])

            int_humidity = int(row['humidity'])
            int_feelsLike = int(row['feels_like'])
            int_population = int(str(row['Population']).replace(',',''))
            float_srcitizen = float(row['SeniorCitizen'])

            confidance_score = 0.0

            SeniorCitizenPopulation = (int_population * float_srcitizen)

            if str_sex == 'Male':
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.19) + confidance_score
            else:
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.11) + confidance_score

            if ((int_humidity > 70) | (int_humidity < 40)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.22

            if ((int_feelsLike > 280) | (int_feelsLike < 260)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.17

            final_score = round(round(confidance_score, 2) / (1000 * 10), 2)

            return final_score

        except Exception as e:
            x = str(e)

            return x

    def setMap(self, dfInput):
        try:
            resVal = 0
            df = p.DataFrame()
            debug_ind = 'Y'
            src_file =  self.src_file

            # Initiating Log Class
            l = cl.clsL()

            df = dfInput

            # Creating a subset of desired columns
            dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

            l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

            # Fetching Senior Citizen Data
            df = p.read_csv(src_file, index_col=False)

            # Merging two frames
            dfMerge = p.merge(df, dfMod, on=['CityName'])

            l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

            # Getting RiskFactor quotient from our custom made logic
            dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

            l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

            # Generating Map plotss
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})
            sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

            # This is required when you are running
            # through normal Python & not through
            # Jupyter Notebook
            plt.show()

            return resVal

        except Exception as e:
            x = str(e)
            print(x)

            logging.info(x)
            resVal = x

            return resVal

Key lines from the above codebase –

# Creating a subset of desired columns
dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

# Fetching Senior Citizen Data
df = p.read_csv(src_file, index_col=False)

# Merging two frames
dfMerge = p.merge(df, dfMod, on=['CityName'])

l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

# Getting RiskFactor quotient from our custom made logic
dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

Combining our Senior Citizen data with already processed data coming from our primary calling script. Also, here the application is calculating our custom logic to find out the risk factor figures. If you want to go through that, I’ve provided the logic to derive it. However, this is just a demo to find out similar figures. You should not rely on the logic that I’ve used (It is kind of my observation of life till now. :D).

The below lines are only required when you are running seaborn, not via Jupyter notebook.

plt.show()

4. callOpenMapWeatherAPI.py (This is the first calling script. This script also calls the realtime API & then blend the first file with it & pass the only relevant columns of data to our Map script to produce the graph.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsWeather as ct
import re
import numpy as np
import clsMap as cm

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getMainWeather(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_main_weather = str(df_lkp.iloc[0]['main'])

        return str_main_weather

    except Exception as e:
        x = str(e)
        str_main_weather = x

        return str_main_weather

def getMainDescription(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_description = str(df_lkp.iloc[0]['description'])

        return str_description

    except Exception as e:
        x = str(e)
        str_description = x

        return str_description

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']
        col_list = cf.config['COL_LIST']
        col_list_1 = cf.config['COL_LIST_1']
        col_list_2 = cf.config['COL_LIST_2']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        df2 = p.DataFrame()

        src_file =  cf.config['SRC_FILE']

        # Fetching data from source file
        df = p.read_csv(src_file, index_col=False)

        # Creating a list of City Name from the source file
        city_list = df['CityName'].tolist()

        # Declaring an empty dictionary
        merge_dict = {}
        merge_dict['city'] = df2

        start_pos = 1
        src_file_name = '1.' + cf.config['SRC_FILE_INIT']

        for i in city_list:
            x1 = ct.clsWeather()
            ret_2 = x1.searchQry(i)

            # Capturing the JSON Payload
            res = json.loads(ret_2)

            # Converting dictionary to Pandas Dataframe
            # df_ret = p.read_json(ret_2, orient='records')

            df_ret = p.io.json.json_normalize(res)
            df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

            # Removing any duplicate columns
            df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

            # l.logr(str(start_pos) + '.1.' + src_file_name, debug_ind, df_ret, 'log')
            start_pos = start_pos + 1

            # If all the conversion successful
            # you won't get any gust column
            # from OpenMap response. Hence, we
            # need to add dummy reason column
            # to maintain the consistent structures

            if 'gust' not in df_ret.columns:
                df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

            # Resetting the column orders as per JSON
            column_order = col_list
            df_mod_ret = df_ret.reindex(column_order, axis=1)

            if start_pos == 1:
                merge_dict['city'] = df_mod_ret
            else:
                d_frames = [merge_dict['city'], df_mod_ret]
                merge_dict['city'] = p.concat(d_frames)

            start_pos += 1

        for k, v in merge_dict.items():
            l.logr(src_file_name, debug_ind, merge_dict[k], 'log')

        # Now opening the temporary file
        temp_log_file = log_dir + src_file_name

        dfNew = p.read_csv(temp_log_file, index_col=False)

        # Extracting Complex columns
        dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
        dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

        l.logr('2.dfNew.csv', debug_ind, dfNew, 'log')

        # Removing unwanted columns & Renaming key columns
        dfNew.drop(['weather'], axis=1, inplace=True)
        dfNew.rename(columns={'name': 'CityName'}, inplace=True)

        l.logr('3.dfNewMod.csv', debug_ind, dfNew, 'log')

        # Now joining with the main csv
        # to get the complete picture
        dfMain = p.merge(df, dfNew, on=['CityName'])

        l.logr('4.dfMain.csv', debug_ind, dfMain, 'log')

        # Let's extract only relevant columns
        dfSuppliment = dfMain[['CityName', 'Population', 'State', 'country', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription']]

        l.logr('5.dfSuppliment.csv', debug_ind, dfSuppliment, 'log')

        # Let's pass this to our map section
        x2 = cm.clsMap()
        ret_3 = x2.setMap(dfSuppliment)

        if ret_3 == 0:
            print('Successful Map Generated!')
        else:
            print('Please check the log for further issue!')

        print("-" * 60)
        print()

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

Once the application received the JSON response from the realtime API, the application is converting it to pandas dataframe.

# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

Since this is a complex JSON response. The application might encounter duplicate columns, which might cause a problem later. Hence, our app is removing all these duplicate columns as they are not required for our cases.

if 'gust' not in df_ret.columns:
    df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

There is a possibility that the application might not receive all the desired attributes from the realtime API. Hence, the above lines will check & add a dummy column named gust for those records in case if they are not present in the JSON response.

if start_pos == 1:
    merge_dict['city'] = df_mod_ret
else:
    d_frames = [merge_dict['city'], df_mod_ret]
    merge_dict['city'] = p.concat(d_frames)

These few lines required as our API has a limitation of responding with only one city at a time. Hence, in this case, we’re retrieving one town at a time & finally merge them into a single dataframe before creating a temporary source file for the next step.

At this moment our data should look like this –

16. Intermediate_Data_1

Let’s check the weather column. We need to extract the main & description for our dashboard, which will be coming in the next installment.

# Extracting Complex columns
dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

Hence, we’ve used the following two functions to extract these values & the critical snippet from one of the service is as follows –

lkp_Columns = str(row['weather'])
jpayload = str(lkp_Columns).replace("'", '"')
payload = json.loads(jpayload)

df_lkp = p.io.json.json_normalize(payload)
df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

str_main_weather = str(df_lkp.iloc[0]['main'])

The above lines extracting the weather column & replacing the single quotes with the double quotes before the application is trying to convert that to JSON. Once it converted to JSON, the json_normalize will easily serialize it & create individual columns out of it. Once you have them captured inside the pandas dataframe, you can extract the unique values & store them & return them to your primary calling function.

# Let's pass this to our map section
x2 = cm.clsMap()
ret_3 = x2.setMap(dfSuppliment)

if ret_3 == 0:
    print('Successful Map Generated!')
else:
    print('Please check the log for further issue!')

In the above lines, the application will invoke the Map class to calculate the remaining logic & then plotting the data into the seaborn graph.

Let’s just briefly see the central directory structure –

10. RunWindow

Here is the log directory –

11. Log Directory

And, finally, the source directory should look something like this –

12. SourceDir

Now, let’s runt the application –

Following lines are essential –

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')

This will project the plot like this –

13. AdditionalOption

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})

This will lead to the following figures –

14. Adding Markers

As you can see, here, using the marker of (‘o’/’v’) leads to two different symbols for the different gender.

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

This will lead to –

15. Separate By Sex

So, in this case, the application has created two completely different sets for Sex.

So, finally, we’ve done it. 😀

In the next post, I’ll be doing some more improvisation on top of these data sets. Till then – Happy Avenging! 🙂

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Prepare analytics based on streaming data from Twitter using Python

Hi Guys!

Today, we will be projecting an analytics storyline based on streaming data from twitter’s developer account.

I want to make sure that this solely for educational purposes & no data analysis has provided to any agency or third-party apps. So, when you are planning to use this API, make sure that you strictly follow these rules.

In order to create a streaming channel from Twitter, you need to create one developer account.

As I’m a huge soccer fan, I would like to refer to one soccer place on Twitter for this. In this case, we’ll be checking BA Analytics for this.

6. Origin_Site

Please find the steps to create one developer account –

Step -1: 

You have to go to the following link. Over there you need to submit the request in order to create the account. You need to provide proper justification as to why you need that account. I’m not going into those forms. They are self-explanatory.

Once, your developer account activated, you need to click the following link as shown below –

1. TwitterSetup

Once you clicked that, the program will lead to you the following page –

2. TwitterSetup - Continue

If you don’t have any app, the first page will look something like the above page.

Step 2:

3. TwiterSetup - Continue

Now, you need to fill-up the following details. For security reasons, I’ll be hiding sensitive data here.

Step 3:

4. TwitterSetUp - Continue

After creating that, you need to go to the next tab i.e. key’s & tokens. The initial screen will only have Consumer API keys.

Step 4:

To generate the Access token, you need to click the create button from the above screenshot & then the new page will look like this –

5. TwitterSetUp - Continue

Our program will be using all these pieces of information.

So, now we’re ready for our Python program.

In order to access Twitter API through python, you need to install the following package –

pip install python-twitter

Let’s see the directory structure –

7. Directory

Let’s check only the relevant scripts here. We’re not going to discuss the clsL.py as we’ve already discussed. Please refer to the old post.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'ACCESS_TOKEN': '99999999-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
        'ACCESS_SECRET': 'YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY',
        'CONSUMER_KEY': "aaaaaaaaaaaaaaaaaaaaaaa",
        'CONSUMER_SECRET': 'HHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHHH',
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

For security reasons, I’ve removed the original keys with dummy keys. You have to fill-up your own keys.

2. clsTwitter.py (This script will fetch data from Twitter API & process the same & send it to the calling method.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 12-Oct-2019               ####
#### Modified On 12-Oct-2019               ####
####                                       ####
#### Objective: Main class fetching sample ####
#### data from Twitter API.                ####
###############################################

import twitter
from clsConfig import clsConfig as cf
import json
import re
import string
import logging

class clsTwitter:
    def __init__(self):
        self.access_token = cf.config['ACCESS_TOKEN']
        self.access_secret = cf.config['ACCESS_SECRET']
        self.consumer_key = cf.config['CONSUMER_KEY']
        self.consumer_secret = cf.config['CONSUMER_SECRET']

    def find_element(self, srcJson, key):
        """Pull all values of specified key from nested JSON."""
        arr = []

        def fetch(srcJson, arr, key):
            """Recursively search for values of key in JSON tree."""
            if isinstance(srcJson, dict):
                for k, v in srcJson.items():
                    if isinstance(v, (dict, list)):
                        fetch(v, arr, key)
                    elif k == key:
                        arr.append(v)
            elif isinstance(srcJson, list):
                for item in srcJson:
                    fetch(item, arr, key)
            return arr

        finJson = fetch(srcJson, arr, key)
        return finJson

    def searchQry(self, rawQry):
        try:
            fin_dict = {}
            finJson = ''
            res = ''
            cnt = 0

            # Parameters to invoke Twitter API
            ACCESS_TOKEN = self.access_token
            ACCESS_SECRET = self.access_secret
            CONSUMER_KEY = self.consumer_key
            CONSUMER_SECRET = self.consumer_secret

            tmpR20 = 'Raw Query: ' + str(rawQry)
            logging.info(tmpR20)

            finJson = '['

            if rawQry == '':
                print('No data to proceed!')
                logging.info('No data to proceed!')
            else:
                t = twitter.Api(
                                  consumer_key = CONSUMER_KEY,
                                  consumer_secret = CONSUMER_SECRET,
                                  access_token_key = ACCESS_TOKEN,
                                  access_token_secret = ACCESS_SECRET
                               )

                response = t.GetSearch(raw_query=rawQry)
                print('Total Records fetched:', str(len(response)))

                for i in response:

                    # Converting them to json
                    data = str(i)
                    res_json = json.loads(data)

                    # Calling individual key
                    id = res_json['id']
                    tmpR19 = 'Id: ' + str(id)
                    logging.info(tmpR19)

                    try:
                        f_count = res_json['quoted_status']['user']['followers_count']
                    except:
                        f_count = 0
                    tmpR21 = 'Followers Count: ' + str(f_count)
                    logging.info(tmpR21)

                    try:
                        r_count = res_json['quoted_status']['retweet_count']
                    except:
                        r_count = 0
                    tmpR22 = 'Retweet Count: ' + str(r_count)
                    logging.info(tmpR22)

                    text = self.find_element(res_json, 'text')

                    for j in text:
                        strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
                        pat = re.compile(r'[\t\n]')
                        strG = pat.sub("", strF)
                        res = "".join(strG)

                    # Forming return dictionary
                    #fin_dict.update({id:'id', f_count: 'followerCount', r_count: 'reTweetCount', res: 'msgPost'})
                    if cnt == 0:
                        finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
                    else:
                        finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

                    cnt += 1

            finJson = finJson + ']'

            jdata = json.dumps(finJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails' : x}

            return ResJson

The key lines from this snippet are as follows –

def find_element(self, srcJson, key):
    """Pull all values of specified key from nested JSON."""
    arr = []

    def fetch(srcJson, arr, key):
        """Recursively search for values of key in JSON tree."""
        if isinstance(srcJson, dict):
            for k, v in srcJson.items():
                if isinstance(v, (dict, list)):
                    fetch(v, arr, key)
                elif k == key:
                    arr.append(v)
        elif isinstance(srcJson, list):
            for item in srcJson:
                fetch(item, arr, key)
        return arr

    finJson = fetch(srcJson, arr, key)
    return finJson

This function will check against a specific key & based on that it will search from the supplied JSON & returns the value. This would be particularly very useful when you don’t have any fixed position of your elements.

t = twitter.Api(
                  consumer_key = CONSUMER_KEY,
                  consumer_secret = CONSUMER_SECRET,
                  access_token_key = ACCESS_TOKEN,
                  access_token_secret = ACCESS_SECRET
               )

response = t.GetSearch(raw_query=rawQry)

In this case, Python application will receive the JSON response using the new Twitter API.

id = res_json['id']
try:
    f_count = res_json['quoted_status']['user']['followers_count']
except:
    f_count = 0
try:
    r_count = res_json['quoted_status']['retweet_count']
except:
    r_count = 0

Fetching specific fixed position elements from the response API.

text = self.find_element(res_json, 'text')

Fetching the dynamic position based element using our customized function.

for j in text:
    strF = re.sub(f'[^{re.escape(string.printable)}]', '', str(j))
    pat = re.compile(r'[\t\n]')
    strG = pat.sub("", strF)
    res = "".join(strG)

Removing non-printable characters & white spaces from the extracted text field in order to get clean data.

if cnt == 0:
    finJson = finJson + '{"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'
else:
    finJson = finJson + ', {"id":' + str(id) + ',"followerCount":' + str(f_count) + ',"reTweetCount":' + str(r_count) + ', "msgPost":"' + str(res) + '"}'

Finally, generating a JSON string dynamically.

jdata = json.dumps(finJson)
ResJson = json.loads(jdata)

And, returning the JSON to our calling program.

3. callTwitterAPI.py (This is the main script that will invoke the Twitter API & then project the analytic report based on the available Twitter data.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
#### Modified On 12-Oct-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsTwitter as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = 'q=from%3ABlades_analytic&src=typd'

        x1 = ct.clsTwitter()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        df_ret = p.read_json(ret_2, orient='records')

        # Resetting the column orders as per JSON
        df_ret = df_ret[list(res[0].keys())]

        l.logr('1.Twitter_' + var + '.csv', debug_ind, df_ret, 'log')

        print('Realtime Twitter Data:: ')
        logging.info('Realtime Twitter Data:: ')
        print(df_ret)
        print()

        # Checking execution status
        ret_val_2 = df_ret.shape[0]

        if ret_val_2 == 0:
            print("Twitter hasn't returned any rows. Please check your queries!")
            logging.info("Twitter hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)

        # Performing Basic Aggregate
        # 1. Find the user who has maximum Followers
        df_ret['MaxFollower'] = getMaximumFollower(df_ret)

        # 2. Find the user who has maximum Re-Tweets
        df_ret['MaxTweet'] = getMaximumRetweet(df_ret)

        # Getting Status
        df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

        # Dropping Columns
        df_MaxFollower.drop(['reTweetCount'], axis=1, inplace=True)
        df_MaxFollower.drop(['MaxTweet'], axis=1, inplace=True)

        l.logr('2.Twitter_Maximum_Follower_' + var + '.csv', debug_ind, df_MaxFollower, 'log')

        print('Maximum Follower:: ')
        print(df_MaxFollower)
        print("*" * 157)
        logging.info(tmpR0)

        df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]
        print()

        # Dropping Columns
        df_MaxTwitter.drop(['followerCount'], axis=1, inplace=True)
        df_MaxTwitter.drop(['MaxFollower'], axis=1, inplace=True)

        l.logr('3.Twitter_Maximum_Retweet_' + var + '.csv', debug_ind, df_MaxTwitter, 'log')

        print('Maximum Re-Twitt:: ')
        print(df_MaxTwitter)
        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

And, here are the key lines –

x1 = ct.clsTwitter()
ret_2 = x1.searchQry(rawQry)

Our application is instantiating the newly developed class.

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.read_json(ret_2, orient='records')

# Resetting the column orders as per JSON
df_ret = df_ret[list(res[0].keys())]

Converting the JSON to pandas dataframe for our analytic data point.

def getMaximumFollower(df):
    try:
        d1 = df['followerCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = 0

        return dt_part1

def getMaximumRetweet(df):
    try:
        d1 = df['reTweetCount'].max()
        d1_max_str = int(d1)

        return d1_max_str
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

These two functions declared above in the calling script are generating the maximum data point from the Re-Tweet & Followers from our returned dataset.

# Getting Status
df_MaxFollower = df_ret[(df_ret['followerCount'] == df_ret['MaxFollower'])]

And, this is the way, our application will fetch the maximum twitter dataset –

df_MaxTwitter = df_ret[(df_ret['reTweetCount'] == df_ret['MaxTweet'])]

And, you can customize your output by dropping unwanted columns in the specific dataset.

And, here is the output on Windows, which looks like –

8. WindowsRun

And, here is the windows log directory –

WindowsRunLog

So, we’ve achieved our target data point.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Building Pipeline into the Azure Data Factory!

Hi Guys!

After a brief gap, here is the latest edition of data pipeline implementation through Azure Data Factory.

Our objective is straightforward. In this post, we’ll develop a major ADF pipeline. However, our main aim in coming days to use the same pipe & enhance them in the coming days.

So, let’s get started!

Step -1:

The first step to view the sample data.

0. Sample Data

So, now, we’ll be loading this file in one of our already created SQL DB inside our cloud environment.

Step -2:

Now, we will upload this sample file to blobstore, which our application will use as a source in the flow.

1. Source File

Step -3:

Let’s create & set-up the initial data factory.

2. Data Factory Initial Screen

Now, clicking the create data factory, we’ll set-up the first time environment.

3. Initial SetUp

By clicking the “create,” the cloud will prepare the environment for the first time.

4. Basic Screen

For security reason, I’m not displaying other essential options.

Now, you need to click Author & Monitor to finally arrive at the main development interface, which will look like this –

5. In-Progress
6. Initial Screen

Now, you need to click “Create Pipeline.”

7. Adding Dataset

Once, you are on this page. You are ready to build your first data flow.

Step -4:

Let’s make the source data ready. To do that, we’ll be the first click the “Add Dataset” in this case. And, follow the given steps provided in the series of snapshot given below –

8. Selecting Data Source

In this case, we would choose Azure Blob Storage as our source place.

10. Selected Type

Now, we have to choose the source file type.

Finally, you need to provide other essential information as shown in the next screenshot & you need to test it.

11. Testing Source Data Connection

Now, you need to choose the source path & need to preview the data. You can only view the data once you can successfully connect.

12. Choosing Path - 1
13. Choosing Path -2
14. Choosing Path -3

So, if everything looks good. Then you can see the following page –

15. Preview

Once, you click it. You will be able to view it.

16. Preview - 2

Step -5:

Now, we’ll be using the copy data option as shown below –

17. Using Copy Data Option

Now, we’ll be configuring the key places –

18. Selecting Source

As you can see, you can choose the csv file as source.

You can then generate the mapping. By clicking the Mapping tab, you can view the detail.

19. Import Schemas

Now, we’ll be preparing the dataflow.

20. Adding DataFlow

So, basically, here we’ll be creating the flow as per our requirement. In our case, we’ll be using one primary filter before we push our data to the target.

And, the steps are as follows –

As from the above picture, we have configured the source by choosing the appropriate source data selection.

21. Dataflow Debug Training

You have to turn-on debug preview mode. But, remember one crucial point. For that ADF will create one runtime cluster & for that you will be charged separately. However, you can view while building the data pipeline.

22. Dataflow Preview

Finally, we’ll be selecting the target.

23. Sink Or Target

We’ll be dragging the sink/target. And, then configure that in this following steps –

24. Choosing Target
25. TargetConnectionTest
26. Creating A new table

So, In this case, It will create a new table.

Once, you prepare everything, you have to validate the basic flow.

28. Validate Floew

Finally, we’ll create the trigger.

31. triggers - 1

You need to click the trigger at the bottom. You need to mention the trigger timing. Just for testing purpose, we’ll mention one-time execution only.

32. Trtigger Timing

Once, you click the finish button. The next page will look like me.

33. Setting Up Triggers

Here is the Action button. Once, you click the play/run button – you would trigger the task/flow & this would look like this.

34. Running Triggers

So, we’re done for the day.

Let me know, what do you think?

Till then! Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet.

Combining the NoSQL(Cosmos DB) & traditional Azure RDBMS in Azure (Time stone solo from Python verse)

Hi Guys!

Today, our main objective is to extend our last post & blending two different kinds of data using Python.

Please refer the earlier post if you didn’t go through it – “Building Azure cosmos application.“.

What is the Objective?

In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.

Examining Source Data.

No SQL Data from Cosmos:

Let’s check one more time the No SQL data created in our last post.

CosmosData

Total, we’ve created 6 records in our last post.

As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.

Azure SQL DB:

Let’s create some data in Azure SQL DB.

But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.

I won’t discuss the detailed steps of creating DB here.

From Azure portal, it looks like –

Azure SQL DB Main Screen

Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.

Let’s create the table & some sample data aligned as per our cosmos data.

Azure SQL DB

We will join both the data based on subscriberId & then extract our required columns in our final output.

CombinedData

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py
  • clsColMgmt.py
  • clsCosmosDBDet.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC

Here is the detailed directory structure between the Windows & MAC O/S.

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Updated On: 02-Jun-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'SERVER': 'xxxx-xxx.database.windows.net',
        'DATABASE_1': 'SalesForceMaster',
        'DATABASE_2': 'hrMaster',
        'DATABASE_3': 'statMaster',
        'USERNAME': 'admin_poc_dev',
        'PASSWORD': 'xxxxx',
        'DRIVER': '{ODBC Driver 17 for SQL Server}',
        'ENV': 'pocdev-saty',
        'ENCRYPT_FLAG': "yes",
        'TRUST_FLAG': "no",
        'TIMEOUT_LIMIT': "30",
        'PROCSTAT': "'Y'",
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'CONFIG_TABLE': 'ETL_CONFIG_TAB',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'AZURE_SQL_1': "SELECT DISTINCT subscriberId, state, country, annualIncome, customerType FROM dbo.onboardCustomer",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

Here, we’ve added a couple of more entries compared to the last time, which points the detailed configuration for Azure SQL DB.

‘SERVER’: ‘xxxx-xxx.database.windows.net’,
‘DATABASE_1’: ‘SalesForceMaster’,
‘DATABASE_2’: ‘hrMaster’,
‘DATABASE_3’: ‘statMaster’,
‘USERNAME’: ‘admin_poc_dev’,
‘PASSWORD’: ‘xxxxx’,
‘DRIVER’: ‘{ODBC Driver 17 for SQL Server}’,
‘ENV’: ‘pocdev-saty’,
‘ENCRYPT_FLAG’: “yes”,
‘TRUST_FLAG’: “no”,
‘TIMEOUT_LIMIT’: “30”,
‘PROCSTAT’: “‘Y'”, 

Here, you need to supply your DB credentials accordingly.

2. clsDBLookup.py (This script will look into the Azure SQL DB & fetch data from the traditional RDBMS of Azure environment.)

#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 25-May-2019                     ####
####                                             ####
#### Objective: This script will check &         ####
#### test the connection with the Azure          ####
#### SQL DB & it will fetch all the records      ####
#### name resied under the same DB of a table.   ####
#####################################################

import pyodbc as py
import pandas as p
from clsConfig import clsConfig as cdc

class clsDBLookup(object):
    def __init__(self, lkpTableName = ''):
        self.server = cdc.config['SERVER']
        self.database = cdc.config['DATABASE_1']
        self.database1 = cdc.config['DATABASE_2']
        self.database2 = cdc.config['DATABASE_3']
        self.username = cdc.config['USERNAME']
        self.password = cdc.config['PASSWORD']
        self.driver = cdc.config['DRIVER']
        self.env = cdc.config['ENV']
        self.encrypt_flg = cdc.config['ENCRYPT_FLAG']
        self.trust_flg = cdc.config['TRUST_FLAG']
        self.timeout_limit = cdc.config['TIMEOUT_LIMIT']
        self.lkpTableName = cdc.config['CONFIG_TABLE']
        self.ProcStat = cdc.config['PROCSTAT']
        self.AppId = cdc.config['APP_ID']

    def LookUpData(self):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            db_con_azure = py.connect(str_conn)

            query = " SELECT [ruleId] as ruleId, [ruleName] as ruleName, [ruleSQL] as ruleSQL, " \
                    " [ruleFlag] as ruleFlag, [appId] as appId, [DBType] as DBType, " \
                    " [DBName] as DBName FROM [dbo][" + lkpTableName + "] WHERE ruleFLag = " + ProcStat + " " \
                    " and appId = " + AppId + " ORDER BY ruleId "

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

    def azure_sqldb_read(self, sql):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            # print("Connection Details:: ", str_conn)
            db_con_azure = py.connect(str_conn)

            query = sql

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

Major lines to discuss –

azure_sqldb_read(self, sql):

Getting the source SQL supplied from the configuration script.

db_con_azure = py.connect(str_conn)

query = sql

df = p.read_sql(query, db_con_azure)

After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.

3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Modified On 02-Jun-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsLog()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Accessing from Azure SQL DB
        x1 = dbcon.clsDBLookup()
        act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

        print("Azure SQL DB::")
        print(act_df)
        print()

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

        # Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
        df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

        df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

        print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
        print(df_fin)

        l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')

        # Transforming the orderDate as per standard format
        df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

        # Dropping the old column & renaming the new column to old column
        df_fin.drop(columns=['orderDate'], inplace=True)
        df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

        print("*" * 157)
        print()
        print("Final Combined & Transformed result:: ")
        print(df_fin)

        l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
        print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The key lines from this script –

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

This function converts NoSQL date data type more familiar format.

NoSQL Date:
NoSQL_Date
Transformed Date:
Transformed Date
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

print("Azure SQL DB::")
print(act_df)
print()

Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.

# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.

# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.

Let’s see the directory structure of our program –

Win_Vs_MAC

Let’s see how it looks when it runs –

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

So, finally, we’ve successfully blended the data & make more meaningful data projection.

Following python packages are required to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

pip install pyodbc

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]

The advanced concept of Pandas & Numpy with an aggregate & lookup of file logging (A crossover over of Space Stone & Soul Stone from the Python verse)

Today, we’ll be implementing the advanced concept of Pandas & Numpy & how one can aggregate data & produce meaningful data insights into your business, which makes an impact on your overall profit.

First, let us understand the complexity of the problem & what we’re looking to achieve here. For that, you need to view the source data & lookup data & how you want to process the data.

Source Data:

sourcedata-e1554702920904-1

The above picture is a sample data-set from a Bank (Data available on U.S public forum), which captures the information of the customer’s current account balance. Let’s look into the look-up files sample data –

First File:

LookUp_1_Actual

Second File:

LookUp_2So, one can clearly see, Bank is trying to get a number of stories based on the existing data.

Challenges:

The first lookup file contains data in a manner where the column of our source file is row here. Hence, you need to somehow bring the source data as per the lookup file to get the other relevant information & then joining that with the second lookup file to bring all the data point for your storyline.

Look-Up Configuration:

In order to match the look-up data with our source data, we’ll be adding two new columns, which will help the application to process the correct row out of the entries provided in the look-up file 1.

LookUp_1

As you can see from the above picture, that two new columns i.e. Category & Stat have added in this context. Here, the category contains metadata information. If a column has a significant number of unique values, then we’re marking it as ‘D in the category. In this case, the bank doesn’t offer any scheme based on the customer’s name. Hence, these fields are marked with ‘I. For the Gender column, the application has less number of unique records i.e. either ‘Male‘ or ‘Female‘. As a result, we provided two corresponding entries. Remember, DateJoined is a key column here. Even though we marked its category as ‘I‘, which denote no transformation requires – ‘K‘ will denote that it is the driving column apart from one of the surrogate key [PKEY] that we’ll be generating during our application transformation process. I’ll discuss that in the respective snippet discussion.

Our Goal:

Based on the source data, We need to find the following story & published that in an excel sheet separately.

  1. The country, Gender wise Bank’s contribution.
  2. The country, Job-wise Bank’s contribution.
  3. The country & Age range wise Saving trends & Bank’s contribution.

A little note on Bank’s Contribution:

Let us explain, what exactly means by Bank’s contribution. Sometimes, bank want’s to encourage savings to an individual client based on all the available factors. So, let’s assume that – Bank contribute $1 for every $150 saving of a person. Again this $1 may vary based on the Age Range & gender to promote a specific group. Also, when someone opens any savings account with the bank, by default bank contributed a sum of $100 at the time when they open an account for a short period of time as part of their promotion strategy. These details you will get it from first lookup file. Second lookup file contains the age range category base on the Group that is available in First Lookup file.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post

  • clsFindFile.py
  • clsL.py

So, I’m not going to discuss these scripts. 

1. clsParam.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.) 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import os
import platform as pl

class clsParam(object):
    os_det = pl.system()
    dir_sep = ''

    if os_det == "Windows":
        dir_sep = "\\"
    else:
        dir_sep = '/'

    config = {
        'MAX_RETRY' : 5,
        'PATH' : os.path.dirname(os.path.realpath(__file__)) + dir_sep,
        'SRC_DIR' : os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'src_files' + dir_sep,
        'FIN_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'finished' + dir_sep,
        'LKP_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'lkp_files' + dir_sep,
        'LOG_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'log' + dir_sep,
        'LKP_FILE': 'DataLookUp',
        'LKP_CATG_FILE': 'CategoryLookUp',
        'LKP_FILE_DIR_NM': 'lkp_files',
        'SRC_FILE_DIR_NM': 'src_files',
        'FIN_FILE_DIR_NM': 'finished',
        'LOG_FILE_DIR_NM': 'log',
        'DEBUG_IND': 'Y'
    }

 

2. clsLookUpDataRead.py (This script will look into the lookup file & this will generate the combined lookup result as we’ve two different lookup files. Hence, the name comes into the picture.) 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import pandas as p
import clsFindFile as c
import clsL as log
from clsParam import clsParam as cf
import datetime

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsLookUpDataRead(object):

    def __init__(self, lkpFilename):
        self.lkpFilename = lkpFilename

        self.lkpCatgFilename = cf.config['LKP_CATG_FILE']
        self.path = cf.config['PATH']
        self.subdir = str(cf.config['LOG_FILE_DIR_NM'])

        # To disable logging info
        self.Ind = cf.config['DEBUG_IND']
        self.var = datetime.datetime.now().strftime(".%H.%M.%S")

    def getNaN2Null(self, row):
        try:
            str_val = ''
            str_val = str(row['Group']).replace('nan', '').replace('NaN','')

            return str_val
        except:
            str_val = ''

            return str_val

    def ReadTable(self):
        # Assigning Logging Info
        lkpF = []
        lkpF_2 = []
        var = self.var
        Ind = self.Ind
        subdir = self.subdir

        # Initiating Logging Instances
        clog = log.clsL()

        try:

            # Assinging Lookup file name
            lkpFilename = self.lkpFilename

            # Fetching the actual look-up file name
            f = c.clsFindFile(lkpFilename, str(cf.config['LKP_FILE_DIR_NM']))
            lkp_file_list = list(f.find_file())

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for i in range(len(lkp_file_list)):
                lkpF = lkp_file_list[i]

            # Fetching the content of the look-up file
            df_lkpF = p.read_csv(lkpF, index_col=False)

            # Fetching Category LookUp File
            LkpCatgFileName = self.lkpCatgFilename

            f1 = c.clsFindFile(LkpCatgFileName, str(cf.config['LKP_FILE_DIR_NM']))
            lkp_file_list_2 = list(f1.find_file())

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for j in range(len(lkp_file_list_2)):
                lkpF_2 = lkp_file_list_2[j]

            # Fetching the content of the look-up file
            df_lkpF_2 = p.read_csv(lkpF_2, index_col=False)

            # Changing both the column data type as same type
            df_lkpF['Group_1'] = df_lkpF['Group'].astype(str)
            df_lkpF_2['Group_1'] = df_lkpF_2['Group'].astype(str)

            # Dropping the old column
            df_lkpF.drop(['Group'], axis=1, inplace=True)
            df_lkpF_2.drop(['Group'], axis=1, inplace=True)

            # Renaming the changed data type column with the old column name
            df_lkpF.rename(columns={'Group_1':'Group'}, inplace=True)
            df_lkpF_2.rename(columns={'Group_1': 'Group'}, inplace=True)

            # Merging two lookup dataframes to form Final Consolidated Dataframe
            df_Lkp_Merge = p.merge(
                                    df_lkpF[['TableName', 'ColumnOrder', 'ColumnName', 'MappedColumnName',
                                             'Category', 'Stat', 'Group', 'BankContribution']],
                                    df_lkpF_2[['StartAgeRange', 'EndAgeRange', 'Group']],
                                    on=['Group'], how='left')

            # Converting NaN to Nul or empty string
            df_Lkp_Merge['GroupNew'] = df_Lkp_Merge.apply(lambda row: self.getNaN2Null(row), axis=1)

            # Dropping the old column & renaming the new column
            df_Lkp_Merge.drop(['Group'], axis=1, inplace=True)
            df_Lkp_Merge.rename(columns={'GroupNew': 'Group'}, inplace=True)

            clog.logr('1.df_Lkp_Merge' + var + '.csv', Ind, df_Lkp_Merge, subdir)

            return df_Lkp_Merge

        except(FileNotFoundError, IOError) as s:
            y = str(s)
            print(y)

            # Declaring Empty Dataframe
            df_error = p.DataFrame()

            return df_error
        except Exception as e:
            x = str(e)
            print(x)

            # Declaring Empty Dataframe
            df_error = p.DataFrame()

            return df_error

 

Key lines from this script –

# Fetching the actual look-up file name
f = c.clsFindFile(lkpFilename, str(cf.config['LKP_FILE_DIR_NM']))
lkp_file_list = list(f.find_file())

# Ideally look-up will be only one file
# Later it will be converted to table
for i in range(len(lkp_file_list)):
lkpF = lkp_file_list[i]

# Fetching the content of the look-up file
df_lkpF = p.read_csv(lkpF, index_col=False)

Here, the application will try to find out the lookup file based on the file name pattern & directory path. And, then load the data into the dataframe.

# Fetching Category LookUp File
LkpCatgFileName = self.lkpCatgFilename

f1 = c.clsFindFile(LkpCatgFileName, str(cf.config['LKP_FILE_DIR_NM']))
lkp_file_list_2 = list(f1.find_file())

# Ideally look-up will be only one file
# Later it will be converted to table
for j in range(len(lkp_file_list_2)):
lkpF_2 = lkp_file_list_2[j]

# Fetching the content of the look-up file
df_lkpF_2 = p.read_csv(lkpF_2, index_col=False)

In this step, the second lookup file will be loaded into the second dataframe.

# Changing both the column data type as same type
df_lkpF['Group_1'] = df_lkpF['Group'].astype(str)
df_lkpF_2['Group_1'] = df_lkpF_2['Group'].astype(str)

# Dropping the old column
df_lkpF.drop(['Group'], axis=1, inplace=True)
df_lkpF_2.drop(['Group'], axis=1, inplace=True)

# Renaming the changed data type column with the old column name
df_lkpF.rename(columns={'Group_1':'Group'}, inplace=True)
df_lkpF_2.rename(columns={'Group_1': 'Group'}, inplace=True)

It is always better to cast the same datatype for those columns, which will be used part of the joining key. The above snippet does exactly that.

# Merging two lookup dataframes to form Final Consolidated Dataframe
df_Lkp_Merge = p.merge(
df_lkpF[['TableName', 'ColumnOrder', 'ColumnName', 'MappedColumnName',
'Category', 'Stat', 'Group', 'BankContribution']],
df_lkpF_2[['StartAgeRange', 'EndAgeRange', 'Group']],
on=['Group'], how='left')

In this step, the first lookup file will be left join with the second lookup file based on Group column.

# Converting NaN to Nul or empty string
df_Lkp_Merge['GroupNew'] = df_Lkp_Merge.apply(lambda row: self.getNaN2Null(row), axis=1)

# Dropping the old column & renaming the new column
df_Lkp_Merge.drop(['Group'], axis=1, inplace=True)
df_Lkp_Merge.rename(columns={'GroupNew': 'Group'}, inplace=True)

Once merge is done, key columns need to suppress ‘NaN’ values to Null for better data process.

3. clsPivotLookUp.py (This script will actually contain the main logic to process & merge the data between source & lookup files & create group data & based on that data point will be produced & captured in the excel. Hence, the name comes into the picture.) 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import pandas as p
import numpy as np
import clsFindFile as c
import clsL as log
import datetime
from clsParam import clsParam as cf
from pandas import ExcelWriter

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsPivotLookUp(object):

    def __init__(self, srcFilename, tgtFileName, df_lkpF):
        self.srcFilename = srcFilename
        self.tgtFileName = tgtFileName
        self.df_lkpF = df_lkpF
        self.lkpCatgFilename = cf.config['LKP_CATG_FILE']

        self.path = cf.config['PATH']
        self.subdir = str(cf.config['LOG_FILE_DIR_NM'])
        self.subdir_2 = str(cf.config['FIN_FILE_DIR_NM'])
        # To disable logging info
        self.Ind = cf.config['DEBUG_IND']
        self.report_path = cf.config['FIN_DIR']

    def dfs_tabs(self, df_list, sheet_list, file_name):
        try:
            cnt = 0
            number_rows = 0

            writer = p.ExcelWriter(file_name, engine='xlsxwriter')

            for dataframe, sheet in zip(df_list, sheet_list):
                number_rows = int(dataframe.shape[0])
                number_cols = int(dataframe.shape[1])

                if cnt == 0:
                    dataframe.to_excel(writer, sheet_name=sheet, startrow=7, startcol=5)
                else:
                    dataframe.to_excel(writer, sheet_name=sheet, startrow=5, startcol=0)

                # Get the xlsxwriter workbook & worksheet objects
                workbook = writer.book
                worksheet = writer.sheets[sheet]
                worksheet.set_zoom(90)

                if cnt == 0:
                    worksheet.set_column('A:E', 4)
                    worksheet.set_column('F:F', 20)
                    worksheet.set_column('G:G', 10)
                    worksheet.set_column('H:J', 20)

                    # Insert an Image
                    worksheet.insert_image('E1', 'Logo.png', {'x_scale':0.6, 'y_scale':0.8})

                    # Add a number format for cells with money.
                    money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})
                    worksheet.set_column('H:H', 20, money_fmt)

                    # Define our range for color formatting
                    color_range = "F9:F{}".format(number_rows * 2 + 1)

                    # Add a format. Red fill with the dark red text
                    red_format = workbook.add_format({'bg_color':'#FEC7CE', 'font_color':'#0E0E08', 'border':1})

                    # Add a format. Green fill with the dark green text
                    green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08', 'border': 1})

                    # Add a format. Cyan fill with the dark green text
                    mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08', 'border': 1})

                    # Add a format. Other fill with the dark green text
                    oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08', 'border': 1})

                    worksheet.conditional_format(color_range, {'type':'cell',
                                                               'criteria':'equal to',
                                                               'value':'"England"',
                                                               'format': green_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Northern Ireland"',
                                                               'format': mid_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Scotland"',
                                                               'format': red_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Wales"',
                                                               'format': oth_format})
                else:
                    first_row = 5
                    first_col = 0
                    last_row = first_row + (number_rows * 2)
                    last_col = number_cols - 1

                    if cnt == 1:
                        worksheet.set_column('A:D', 20)
                    else:
                        worksheet.set_column('A:E', 20)
                        worksheet.set_column('F:F', 20)


                    # Add a number format for cells with money.
                    # money_fmt = workbook.add_format({'num_format': '$#,##0', 'bold': True, 'border':1})
                    money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})

                    # Amount columns
                    if cnt == 1:
                        worksheet.set_row(6, 0, money_fmt)
                        worksheet.set_column('C:C', 20, money_fmt)
                    else:
                        worksheet.set_row(6, 0, money_fmt)
                        worksheet.set_column('D:F', 20, money_fmt)

                    # Insert an Image
                    worksheet.insert_image('B1', 'Logo.png', {'x_scale': 0.5, 'y_scale': 0.5})

                    # Add a format. Red fill with the dark red text
                    red_format = workbook.add_format({'bg_color': '#FEC7CE', 'font_color': '#0E0E08'})

                    # Add a format. Green fill with the dark green text
                    green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08'})

                    # Add a format. Cyan fill with the dark green text
                    mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08'})

                    # Add a format. Other fill with the dark green text
                    oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08'})

                    # Fill colour based on formula
                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="England"',
                                                  'format': green_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Northern Ireland"',
                                                  'format': mid_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Scotland"',
                                                  'format': red_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Wales"',
                                                  'format': oth_format})

                cnt += 1

            writer.save()
            writer.close()

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

    def getIntVal(self, row):
        try:
            int_val = 0
            int_val = int(row['MCategory'])

            return int_val
        except:
            int_val = 0

            return int_val

    def getSavingsAmount(self, row):
        try:
            savings = 0.0
            savings = float(row['Balance']) - float(row['BankContribution'])

            return savings
        except:
            savings = 0

            return savings

    def getNaN2Zero_StartAgeRange(self, row):
        try:
            int_AgeRange = 0
            str_StartAgeRange = ''

            str_StartAgeRange = str(row['StartAgeRange']).replace('nan','').replace('NaN','')

            if (len(str_StartAgeRange) > 0):
                int_AgeRange = int(float(str_StartAgeRange))
            else:
                int_AgeRange = 0

            return int_AgeRange
        except:
            int_AgeRange = 0

            return int_AgeRange

    def getNaN2Zero_EndAgeRange(self, row):
        try:
            int_AgeRange = 0
            str_EndAgeRange = ''

            str_EndAgeRange = str(row['EndAgeRange']).replace('nan','').replace('NaN','')

            if (len(str_EndAgeRange) > 0):
                int_AgeRange = int(float(str_EndAgeRange))
            else:
                int_AgeRange = 0

            return int_AgeRange
        except:
            int_AgeRange = 0

            return int_AgeRange


    def parse_and_write_csv(self):

        # Assigning Logging Info
        Ind = self.Ind
        subdir = self.subdir
        subdir_2 = self.subdir_2
        lkpF = []
        lkpF_2 = []
        report_path = self.report_path

        #Initiating Logging Instances
        clog = log.clsL()

        if Ind == 'Y':
            print('Logging Enabled....')
        else:
            print('Logging Not Enabled....')

        # Assigning Source File Basic Name
        srcFileInit = self.srcFilename
        tgtFileName = self.tgtFileName
        df_lkpF = self.df_lkpF

        try:

            # Fetching the actual source file name
            d = c.clsFindFile(self.srcFilename, str(cf.config['SRC_FILE_DIR_NM']))
            src_file_list = d.find_file()

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for i in range(len(src_file_list)):

                # Handling Multiple source files
                var = datetime.datetime.now().strftime(".%H.%M.%S")
                print('Target File Extension will contain the following:: ', var)

                srcF = src_file_list[i]

                # Reading Source File
                df = p.read_csv(srcF, index_col=False)

                # Adding a new surrogate key to the existing records
                df = df.assign(PKEY=[1 + i for i in range(len(df))])[['PKEY'] + df.columns.tolist()]

                clog.logr('2.DF_Assign' + var + '.csv', Ind, df, subdir)

                # Fetching only relevant rows from the Look-up Files
                # based on Filters with 'I' or No Token
                # 'K' for Key columns with No Token
                # 'D' for Single column Token
                df_lkpFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
                                     ((df_lkpF['Category'] == 'I') | (df_lkpF['Category'] == 'K'))]

                # Fetching the unique records from Look-up table
                id_list1 = list(df_lkpFile['ColumnName'].drop_duplicates())
                id_list2 = ['PKEY']

                id_list = id_list2 + id_list1

                # Pivoting part of the source file data to be join for merge
                df_melt = df.melt(id_vars=id_list, var_name='ColumnName')

                # Changing the generated column Value to Category for upcoming Merge
                # df_melt = df_tmp_melt.rename_by_col_index(idx_np,'Category')
                # df_melt.rename(columns={'value': 'Category'}, inplace=True)
                df_melt.rename(columns={'value': 'MCategory'}, inplace=True)

                #df_melt.to_csv(path+'1.DF_Melt.csv')
                clog.logr('3.DF_Melt' + var + '.csv', Ind, df_melt, subdir)

                # Now fetching look-up file one more time
                # filtering with the only Table Name
                # For merge with our temporary df_melt
                # to get the relevant lookup
                # information

                df_lkpFinFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
                                        ((df_lkpF['Category'] == 'D') | (df_lkpF['Category'] == 'Male') |
                                        (df_lkpF['Category'] == 'K') | (df_lkpF['Category'] == 'Female'))]

                clog.logr('4.DF_Finlkp' + var + '.csv', Ind, df_lkpFinFile, subdir)

                # Merging two files based on Keys
                # df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName', 'Category'], how='left')
                df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName'], how='left')

                clog.logr('5.DF_FIN_Basic_Merge' + var + '.csv', Ind, df_fin, subdir)

                df_fin2 = df_fin[((df_fin['MCategory'] == 'I') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 ((df_fin['MCategory'] == 'Male') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 ((df_fin['MCategory'] == 'Female') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 (df_fin['MCategory'] == 'NaN') |
                                 (df_fin['MCategory'] == 'D') |
                                 (
                                     (df_fin['MCategory'] != 'I') & (df_fin['MCategory'] != 'Male') &
                                     (df_fin['MCategory'] != 'Female') & (df_fin['MCategory'] != 'D') &
                                     (df_fin['MCategory'] != 'NaN')
                                 )]

                clog.logr('6.Merge_After_Filter' + var + '.csv', Ind, df_fin2, subdir)

                # Identifying Integer Column for next step
                df_fin2['Catg'] = df_fin2.apply(lambda row: self.getIntVal(row), axis=1)
                df_fin2['StAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_StartAgeRange(row), axis=1)
                df_fin2['EnAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_EndAgeRange(row), axis=1)

                # Dropping the old Columns
                df_fin2.drop(['Category'], axis=1, inplace=True)
                df_fin2.drop(['StartAgeRange'], axis=1, inplace=True)
                df_fin2.drop(['EndAgeRange'], axis=1, inplace=True)

                # Renaming the new columns
                df_fin2.rename(columns={'Catg': 'Category'}, inplace=True)
                df_fin2.rename(columns={'StAge': 'StartAgeRange'}, inplace=True)
                df_fin2.rename(columns={'EnAge': 'EndAgeRange'}, inplace=True)

                clog.logr('7.Catg' + var + '.csv', Ind, df_fin2, subdir)

                # Handling special cases when Category from source & lookup file won't match
                # alternative way to implement left outer join due to specific data scenarios
                df_fin2['Flag'] = np.where(((df_fin2.StartAgeRange == 0) | (df_fin2.EndAgeRange == 0)) |
                                           (((df_fin2.StartAgeRange > 0) & (df_fin2.EndAgeRange > 0)) &
                                            ((df_fin2.Category >= df_fin2.StartAgeRange)
                                              & (df_fin2.Category <= df_fin2.EndAgeRange))), 'Y', 'N')

                clog.logr('8.After_Special_Filter' + var + '.csv', Ind, df_fin2, subdir)

                # Removing data where Flag is set to Y
                newDF = df_fin2[(df_fin2['Flag'] == 'Y')]

                clog.logr('9.Flag_Filter' + var + '.csv', Ind, newDF, subdir)

                # Need to drop column called ColumnName
                newDF.drop(['TableName'], axis=1, inplace=True)
                newDF.drop(['ColumnOrder'], axis=1, inplace=True)
                newDF.drop(['ColumnName'], axis=1, inplace=True)
                newDF.drop(['Category'], axis=1, inplace=True)
                newDF.drop(['Flag'], axis=1, inplace=True)
                newDF.drop(['Group'], axis=1, inplace=True)

                # Need to rename MappedColumnName to ColumnName
                newDF.rename(columns={'MappedColumnName': 'ColumnName'}, inplace=True)

                clog.logr('10.newDF' + var + '.csv', Ind, newDF, subdir)

                df_short = newDF[['PKEY', 'BankContribution', 'StartAgeRange', 'EndAgeRange']]

                clog.logr('11.df_short' + var + '.csv', Ind, df_short, subdir)

                # Aggregating information
                grouped = df_short.groupby(['PKEY'])
                dfGroup = grouped.aggregate(np.sum)

                clog.logr('12.dfGroup' + var + '.csv', Ind, dfGroup, subdir)

                # Let's merge to get evrything in row level
                df_rowlvl = df.merge(dfGroup, on=['PKEY'], how='inner')

                clog.logr('13.Rowlvl_Merge' + var + '.csv', Ind, df_rowlvl, subdir)

                # Dropping PKEY & Unnamed columns from the csv
                df_rowlvl.drop(['PKEY'], axis=1, inplace=True)

                clog.logr('14.Final_DF' + var + '.csv', Ind, df_rowlvl, subdir)

                ##############################################################
                #### Country & Gender wise Bank's Contribution           #####
                ##############################################################
                dfCountryGender = df_rowlvl[['Region', 'Gender', 'BankContribution']]

                grouped_CG = dfCountryGender.groupby(['Region', 'Gender'])
                dCountryGen = grouped_CG.aggregate(np.sum)

                print("-" * 60)
                print("Country & Gender wise Bank's Contribution")
                print("-" * 60)
                print(dCountryGen)

                clog.logr('15.dCountryGen' + var + '.csv', Ind, dCountryGen, subdir)

                ###############################################################
                ###### End Of Country & Gender wise Bank's Contribution  ######
                ###############################################################

                ##############################################################
                #### Country & Job wise Bank's Contribution              #####
                ##############################################################

                dfCountryJob = df_rowlvl[['Region', 'Job Classification', 'BankContribution']]

                grouped_CJ = dfCountryJob.groupby(['Region', 'Job Classification'])
                dCountryJob = grouped_CJ.aggregate(np.sum)

                print("-" * 60)
                print("Country & Job wise Bank's Contribution")
                print("-" * 60)
                print(dCountryJob)

                clog.logr('16.dCountryJob' + var + '.csv', Ind, dCountryJob, subdir)

                ###############################################################
                ###### End Of Country & Job wise Bank's Contribution     ######
                ###############################################################

                ##############################################################
                #### Country & Age wise Savings & Bank's Contribution    #####
                ##############################################################

                dfCountryAge = df_rowlvl[['Region', 'StartAgeRange', 'EndAgeRange', 'Balance', 'BankContribution']]
                dfCountryAge['SavingsAmount'] = dfCountryAge.apply(lambda row: self.getSavingsAmount(row), axis=1)

                grouped_CA = dfCountryAge.groupby(['Region', 'StartAgeRange', 'EndAgeRange'])
                dCountryAge = grouped_CA.aggregate(np.sum)

                print("-" * 60)
                print("Country & Job wise Bank's Contribution")
                print("-" * 60)
                print(dCountryAge)

                clog.logr('17.dCountryAge' + var + '.csv', Ind, dCountryAge, subdir)

                ##############################################################
                #### End Of Country & Age wise Savings & Bank's          #####
                #### Contribution                                        #####
                ##############################################################

                print('Writing to file!!')

                # Avoiding Index column of dataframe while copying to csv
                # df_token.to_csv(tgtFileName, index=False)
                # For Target File Ind should be always Yes/Y
                Ind = 'Y'

                FtgtFileName = tgtFileName + var + '.csv'
                clog.logr(FtgtFileName, Ind, df_rowlvl, subdir_2)

                ##############################################################
                ##### Writing to Excel File with Different Tabular Sheet #####
                ##############################################################
                dfs = [dCountryGen, dCountryJob, dCountryAge]
                sheets = ['Country-Gender-Stats', 'Country-Job-Stats', 'Country-Age-Stats']

                x = self.dfs_tabs(dfs, sheets, report_path+tgtFileName + var + '.xlsx')

                ##############################################################
                #####             End Of Excel Sheet Writing             #####
                ##############################################################

                # Resetting the Filename after every iteration
                # in case of Mulriple source file exists
                FtgtFileName = ""

            return 0

        except Exception as e:
            x = str(e)
            print(x)
            return 9

 

Key snippets from this script –

# Adding a new surrogate key to the existing records
df = df.assign(PKEY=[1 + i for i in range(len(df))])[['PKEY'] + df.columns.tolist()]

This is extremely crucial as the application will create its own unique key irrespective of data files, which will be used for most of the places for the data process.

df_lkpFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
((df_lkpF['Category'] == 'I') | (df_lkpF['Category'] == 'K'))]

# Fetching the unique records from Look-up table
id_list1 = list(df_lkpFile['ColumnName'].drop_duplicates())
id_list2 = ['PKEY']

id_list = id_list2 + id_list1

This steps will capture all the columns except our key columns in our source table, which will convert columns to rows & then it will be used to join with our look-up table.

# Pivoting part of the source file data to be join for merge
df_melt = df.melt(id_vars=id_list, var_name='ColumnName')

As in the above step, the application is converting key columns of our source file to rows.

df_lkpFinFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
((df_lkpF['Category'] == 'D') | (df_lkpF['Category'] == 'Male') |
(df_lkpF['Category'] == 'K') | (df_lkpF['Category'] == 'Female'))]

In this step, the application will consider all the rows based on source file name pattern & based on certain data, which will be used for lookup join.

df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName'], how='left')

In this step, the application will join the transformed data of source file with our lookup file.

df_fin2 = df_fin[((df_fin['MCategory'] == 'I') & (df_fin['Category'] == df_fin['MCategory'])) |
((df_fin['MCategory'] == 'Male') & (df_fin['Category'] == df_fin['MCategory'])) |
((df_fin['MCategory'] == 'Female') & (df_fin['Category'] == df_fin['MCategory'])) |
(df_fin['MCategory'] == 'NaN') |
(df_fin['MCategory'] == 'D') |
(
(df_fin['MCategory'] != 'I') & (df_fin['MCategory'] != 'Male') &
(df_fin['MCategory'] != 'Female') & (df_fin['MCategory'] != 'D') &
(df_fin['MCategory'] != 'NaN')
)]

This step brings the data, which will look like –

Imp_Step_1

# Identifying Integer Column for next step
df_fin2['Catg'] = df_fin2.apply(lambda row: self.getIntVal(row), axis=1)
df_fin2['StAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_StartAgeRange(row), axis=1)
df_fin2['EnAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_EndAgeRange(row), axis=1)

# Dropping the old Columns
df_fin2.drop(['Category'], axis=1, inplace=True)
df_fin2.drop(['StartAgeRange'], axis=1, inplace=True)
df_fin2.drop(['EndAgeRange'], axis=1, inplace=True)

# Renaming the new columns
df_fin2.rename(columns={'Catg': 'Category'}, inplace=True)
df_fin2.rename(columns={'StAge': 'StartAgeRange'}, inplace=True)
df_fin2.rename(columns={'EnAge': 'EndAgeRange'}, inplace=True)

Now, the application will remove NaN from these key columns for important upcoming step.

After this step, the new data looks like –

Imp_Step_2

So, now, it will be easier to filter out these data based on age range against customer age int the next step as follows –

# Handling special cases when Category from source & lookup file won't match
# alternative way to implement left outer join due to specific data scenarios
df_fin2['Flag'] = np.where(((df_fin2.StartAgeRange == 0) | (df_fin2.EndAgeRange == 0)) |
(((df_fin2.StartAgeRange > 0) & (df_fin2.EndAgeRange > 0)) &
((df_fin2.Category >= df_fin2.StartAgeRange)
& (df_fin2.Category <= df_fin2.EndAgeRange))), 'Y', 'N')

After this, new data looks like –

Imp_Step_3

Finally, filter out only records with ‘Y’. And, the data looks like as follows –

Imp_Step_4

Now, the application needs to consolidate Bank Contribution, Start & End Age Range & needs to re-pivot the data to make it a single row per customer. The data should look like this –

Imp_Step_5

Once this is done, our application is ready for all the aggregated data points.

Hence, three different categories of data transformations are self-explanatory –

Data Point – 1:

##############################################################
#### Country & Gender wise Bank's Contribution #####
##############################################################
dfCountryGender = df_rowlvl[['Region', 'Gender', 'BankContribution']]

grouped_CG = dfCountryGender.groupby(['Region', 'Gender'])
dCountryGen = grouped_CG.aggregate(np.sum)

print("-" * 60)
print("Country & Gender wise Bank's Contribution")
print("-" * 60)
print(dCountryGen)

clog.logr('15.dCountryGen' + var + '.csv', Ind, dCountryGen, subdir)

###############################################################
###### End Of Country & Gender wise Bank's Contribution ######
###############################################################

Data Point – 2:

##############################################################
#### Country & Job wise Bank's Contribution #####
##############################################################

dfCountryJob = df_rowlvl[['Region', 'Job Classification', 'BankContribution']]

grouped_CJ = dfCountryJob.groupby(['Region', 'Job Classification'])
dCountryJob = grouped_CJ.aggregate(np.sum)

print("-" * 60)
print("Country & Job wise Bank's Contribution")
print("-" * 60)
print(dCountryJob)

clog.logr('16.dCountryJob' + var + '.csv', Ind, dCountryJob, subdir)

###############################################################
###### End Of Country & Job wise Bank's Contribution ######
###############################################################

Data Point – 3:

##############################################################
#### Country & Age wise Savings & Bank's Contribution #####
##############################################################

dfCountryAge = df_rowlvl[['Region', 'StartAgeRange', 'EndAgeRange', 'Balance', 'BankContribution']]
dfCountryAge['SavingsAmount'] = dfCountryAge.apply(lambda row: self.getSavingsAmount(row), axis=1)

grouped_CA = dfCountryAge.groupby(['Region', 'StartAgeRange', 'EndAgeRange'])
dCountryAge = grouped_CA.aggregate(np.sum)

print("-" * 60)
print("Country & Job wise Bank's Contribution")
print("-" * 60)
print(dCountryAge)

clog.logr('17.dCountryAge' + var + '.csv', Ind, dCountryAge, subdir)

##############################################################
#### End Of Country & Age wise Savings & Bank's #####
#### Contribution #####
##############################################################

Finally, these datasets will invoke an excel generator function to capture all these data into different sheets & beautify the report are as follows –

##############################################################
##### Writing to Excel File with Different Tabular Sheet #####
##############################################################
dfs = [dCountryGen, dCountryJob, dCountryAge]
sheets = ['Country-Gender-Stats', 'Country-Job-Stats', 'Country-Age-Stats']

x = self.dfs_tabs(dfs, sheets, report_path+tgtFileName + var + '.xlsx')

##############################################################
##### End Of Excel Sheet Writing #####
##############################################################

Key snippets from this function –

writer = p.ExcelWriter(file_name, engine='xlsxwriter')

This step will initiate the excel engine.

for dataframe, sheet in zip(df_list, sheet_list):
number_rows = int(dataframe.shape[0])
number_cols = int(dataframe.shape[1])

In this step, the application will unpack one by one sheet & produce the result into excel.

if cnt == 0:
dataframe.to_excel(writer, sheet_name=sheet, startrow=7, startcol=5)
else:
dataframe.to_excel(writer, sheet_name=sheet, startrow=5, startcol=0)

In this step, this will create the data starting from row 7 into the first sheet, whereas the remaining two sheets will capture data from row 5.

worksheet.set_column('A:E', 4)
worksheet.set_column('F:F', 20)
worksheet.set_column('G:G', 10)
worksheet.set_column('H:J', 20)

This will set the length of these columns.

# Insert an Image
worksheet.insert_image('E1', 'Logo.png', {'x_scale':0.6, 'y_scale':0.8})

In this case, the application will insert my blog logo on top of every page of this excel.

# Add a number format for cells with money.
money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})
worksheet.set_column('H:H', 20, money_fmt)

Also, for the column with monetary information, it will generate a specific format.

# Define our range for color formatting
color_range = "F9:F{}".format(number_rows * 2 + 1)

# Add a format. Red fill with the dark red text
red_format = workbook.add_format({'bg_color':'#FEC7CE', 'font_color':'#0E0E08', 'border':1})

# Add a format. Green fill with the dark green text
green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08', 'border': 1})

# Add a format. Cyan fill with the dark green text
mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08', 'border': 1})

# Add a format. Other fill with the dark green text
oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08', 'border': 1})

worksheet.conditional_format(color_range, {'type':'cell',
'criteria':'equal to',
'value':'"England"',
'format': green_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Northern Ireland"',
'format': mid_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Scotland"',
'format': red_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Wales"',
'format': oth_format})

In this step, the application will color-code individual start cell to highlight specific category for better decision making visually.

4. callPivotLookUp.py (This script will call the main pivot script & process the data as per business requirement. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#####################################################
### Objective: Purpose of this Library is to call ###
### the parse_and_write_csv method to produce the ###
### tokenized columns based on the look-up file.  ###
###                                               ###
### Arguments are as follows:                     ###
### Source File, Target File & Lookup Files.      ###
###                                               ###
#####################################################

import clsPivotLookUp as ct
from clsParam import clsParam as cf
import sys
import pandas as p
import clsLookUpDataRead as cl

def main():
    print("Calling the custom Package..")

    cnt_lkp = 0

    try:
        #Default Look up table
        Lkp_Filename = cf.config['LKP_FILE']

        # Adding New DB Table for Lookup
        x = cl.clsLookUpDataRead(Lkp_Filename)
        df_lkpF = x.ReadTable()

        cnt_lkp = df_lkpF.shape[0]

        if cnt_lkp > 0:
            df_lkpF_copy = df_lkpF.copy()

            # Getting all the unique file names
            df_list_F1 = list(df_lkpF_copy['TableName'].drop_duplicates())

            # File list which has Tokenization
            df_lkpF_Int = df_lkpF[(df_lkpF['Group'].str.len() >= 1)]
            df_list_F2 = list(df_lkpF_Int['TableName'].drop_duplicates())

            for i in df_list_F1:
                if i in df_list_F2:
                    try:
                        inputFile = i

                        print("*"*30)
                        print("Reading from " + inputFile + ".csv")
                        print("*" * 30)

                        srcFileName = inputFile
                        tarFileName = srcFileName + '_processed'

                        x = ct.clsPivotLookUp(srcFileName, tarFileName, df_lkpF)

                        ret_val = x.parse_and_write_csv()

                        if ret_val == 0:
                            print("Writing to file -> (" + tarFileName + ".csv) Status: ", ret_val)
                        else:
                            if ret_val == 5:
                                print("File IO Error! Please check your directory whether the file exists with data!")
                            else:
                                print("Data Processing Issue!")

                        print("*" * 30)
                        print("Operation done for " + srcFileName + "!")
                        print("*" *30)
                    except Exception as e:
                        x = str(e)
                        srcFileName = inputFile
                        print('Check the status of ' + srcFileName + ' ' + x)
                else:
                    pass
        else:
            print("No Matching Data to process!")
    except Exception as e:
        x = str(e)
        print(x)

        print("No Matching Data to process!")

if __name__ == "__main__":
    main()

 

And, the key snippet from here –

# Getting all the unique file names
df_list_F1 = list(df_lkpF_copy['TableName'].drop_duplicates())

# File list which has Tokenization
df_lkpF_Int = df_lkpF[(df_lkpF['Group'].str.len() >= 1)]
df_list_F2 = list(df_lkpF_Int['TableName'].drop_duplicates())

This will identify all the source files, which as similar kind of cases & process them one by one.

x = ct.clsPivotLookUp(srcFileName, tarFileName, df_lkpF)
ret_val = x.parse_and_write_csv()

if ret_val == 0:
print("Writing to file -> (" + tarFileName + ".csv) Status: ", ret_val)
else:
if ret_val == 5:
print("File IO Error! Please check your directory whether the file exists with data!")
else:
print("Data Processing Issue!")

This will call the main application class & based on the return result – it will capture the status of success or failure.

Let’s check the directory of both the Windows & MAC.

Windows:

Win_Dir

MAC:

MAC_Dir

Let’s check the run process –

Windows:

Win_Run_1

Win_Run_2

MAC:

MAC_Run_1

MAC_Run_2

Let’s see – how it looks in Excel –

Windows:

Win_Sheet_1

Win_Sheet_2

Win_Sheet_3

MAC:

MAC_Sheet_1

MAC_Sheet_2

MAC_Sheet_3

So, finally, we’ve achieved our target. 

Horray! We’ve done it! 😀

I hope you’ll like this effort. 

Wait for the next installment. Till then, Happy Avenging. 🙂

[Note: All the sample data are available in public domain for research & study.]