Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(6, 8)
total_2 = total_2 + random.randint(5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2190, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x 260
center_y: self.parent.center_y 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/255, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x 250
center_y: self.parent.center_y 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x 240
center_y: self.parent.center_y 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css&#39;]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

How to store data from XML to Tables

In the previous post we have discussed about generating an XML file using Oracle SQL XML functions. Today we will do that in reverse manner. That means we will load the data generated by that query in the database tables.

At the end of this post (Which is a continue of the previous post) – you will be successfully generate an XML file from Oracle Tables & also able to load the data from XML on that same structured tables. So, that will complete the full life cycle of XML in oracle(Obviously concentrate on some basics).

Lets see –

Our main ingredients for this class – is the XML file named – emp_oracle.xml

And, it looks like –




200
<
First>Whalen</First>
4400
1987-09-17


201
<
First>Billy</First>
4500
1985-06-10


202
<
First>Bireswar</First>
9000
1978-06-10


We need to create one Oracle Directories to map with the Operating System directories in the following manner ->

sys@ORCL>
sys@ORCL>select * from v$version;

BANNER
----------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
PL/SQL Release 11.1.0.6.0 - Production
CORE 11.1.0.6.0 Production
TNS for 32-bit Windows: Version 11.1.0.6.0 - Production
NLSRTL Version 11.1.0.6.0 - Production

Elapsed: 00:00:00.00
sys@ORCL>
sys@ORCL>
sys@ORCL>CREATE OR REPLACE DIRECTORY SATY_DIR AS 'D:\XML_Output'
2 /

Directory created.

Elapsed: 00:00:00.23
sys@ORCL>
sys@ORCL>GRANT READ, WRITE ON DIRECTORY SATY_DIR TO SCOTT, HR;

Grant succeeded.

Elapsed: 00:00:00.08
sys@ORCL>

Once you have created the directory successfully and give the proper privileges to the users like Scott or Hr – you have completed one important component of today’s test. Still we are far to go. Now the second part is –

scott@ORCL>
scott@ORCL>select * from v$version;

BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
PL/SQL Release 11.1.0.6.0 - Production
CORE 11.1.0.6.0 Production
TNS for 32-bit Windows: Version 11.1.0.6.0 - Production
NLSRTL Version 11.1.0.6.0 - Production

Elapsed: 00:00:00.00
scott@ORCL>
scott@ORCL>
scott@ORCL>CREATE TABLE t
2 (
3 serialNo NUMBER(10),
4 fileName VARCHAR2(100),
5 xml XMLTYPE,
6 constraints pk_serialNo primary key(serialNo)
7 );

Table created.

Elapsed: 00:00:04.13
scott@ORCL>
scott@ORCL>
scott@ORCL>CREATE SEQUENCE x_seq
2 START WITH 1
3 INCREMENT BY 1;

Sequence created.

Elapsed: 00:00:00.31
scott@ORCL>
scott@ORCL>CREATE OR REPLACE PROCEDURE load_xml(
2 p_dir IN VARCHAR2,
3 p_filename IN VARCHAR2
4 )
5 IS
6 l_bfile BFILE := BFILENAME(p_dir, p_filename);
7 l_clob CLOB;
8 BEGIN
9 DBMS_LOB.createtemporary (l_clob, TRUE);
10
11 DBMS_LOB.fileopen(l_bfile, DBMS_LOB.file_readonly);
12 DBMS_LOB.loadfromfile(l_clob, l_bfile, DBMS_LOB.getlength(l_bfile));
13 DBMS_LOB.fileclose(l_bfile);
14
15 INSERT INTO t(
16 serialNo,
17 fileName,
18 xml
19 )
20 VALUES (
21 x_seq.NEXTVAL,
22 p_filename,
23 XMLTYPE.createXML(l_clob)
24 );
25
26 COMMIT;
27
28 DBMS_LOB.freetemporary(l_clob);
29 END;
30 /

Procedure created.

Elapsed: 00:00:00.88
scott@ORCL>
scott@ORCL>EXEC load_xml(p_dir => 'SATY_DIR', p_filename => 'emp_oracle.xml');

PL/SQL procedure successfully completed.

Elapsed: 00:00:00.16
scott@ORCL>
scott@ORCL>set long 5000
scott@ORCL>
scott@ORCL>set pagesize 0
scott@ORCL>
scott@ORCL>select xml from t;



200
Whalen
4400
1987-09-17


201
Billy
4500
1985-06-10


202
Bireswar
9000
1978-06-10




Elapsed: 00:00:00.10
scott@ORCL>

Ok. So, we’ve initially load the data into the temp table t. But, we need to load the data from this temp table t to our target table revive_xml which will look like –

scott@ORCL>create table revive_xml
2 (
3 rev_emp_id number(4),
4 rev_f_name varchar2(40),
5 rev_salary number(10,2),
6 rev_jn_dt date,
7 constraints pk_rev_emp_id primary key(rev_emp_id)
8 );

Table created.

Elapsed: 00:00:00.40
scott@ORCL>

Ok. So, we have done another important part of our job. Let’s concentrate on our final mission –

scott@ORCL>insert into revive_xml(
2 rev_emp_id,
3 rev_f_name,
4 rev_salary,
5 rev_jn_dt
6 )
7 select cast(t1.EmployeeId as number(4)) EmployeeId,
8 t2.FirstName,
9 cast(t3.Salary as number(10,2)) Salary,
10 to_date(t4.JoiningDt,'YYYY-MM-DD') JoiningDt
11 from (
12 select rownum rn1,
13 extractValue(value(EmployeeId),'/Emp/Employee_ID') EmployeeId
14 from t,
15 table(xmlsequence(extract(xml, '/EmployeeList/Emp'))) EmployeeId
16 ) t1,
17 (
18 select rownum rn2,
19 extractValue(value(FirstName),'/Emp/First') FirstName
20 from t,
21 table(xmlsequence(extract(xml, '/EmployeeList/Emp'))) FirstName
22 ) t2,
23 (
24 select rownum rn3,
25 extractValue(value(Salary),'/Emp/Sal') Salary
26 from t,
27 table(xmlsequence(extract(xml, '/EmployeeList/Emp'))) Salary
28 ) t3,
29 (
30 select rownum rn4,
31 extractValue(value(HireDate),'/Emp/HireDate') JoiningDt
32 from t,
33 table(xmlsequence(extract(xml, '/EmployeeList/Emp'))) HireDate
34 ) t4
35 where t1.rn1 = t2.rn2
36 and t1.rn1 = t3.rn3
37 and t1.rn1 = t4.rn4;

3 rows created.

Elapsed: 00:00:00.16
scott@ORCL>
scott@ORCL>commit;

Commit complete.

Elapsed: 00:00:00.22
scott@ORCL>
scott@ORCL>
scott@ORCL>select * from revive_xml;

REV_EMP_ID REV_F_NAME REV_SALARY REV_JN_DT
---------- ---------------------------------------- ---------- ---------
200 Whalen 4400 17-SEP-87
201 Billy 4500 10-JUN-85
202 Bireswar 9000 10-JUN-78

scott@ORCL>

So, you have done it finally.

You can do it another way but that is limited to single record parsing –

scott@ORCL>with t
2 as (
3 select xmlType('
4
5
6 200
7 Whalen
8 4400
9 1987-09-17
10
11 ') xml from dual
12 )
13 SELECT rownum rn,
14 a.EmployeeId,
15 a.FirstName,
16 a.Salary,
17 a.JoiningDt
18 FROM t,
19 XMLTABLE('/EmployeeList'
20 PASSING t.xml
21 COLUMNS
22 EmployeeId varchar2(10) PATH '/EmployeeList/Emp/Employee_ID',
23 FirstName varchar2(20) PATH '/EmployeeList/Emp/First',
24 Salary number(10) PATH '/EmployeeList/Emp/Sal',
25 JoiningDt date PATH '/EmployeeList/Emp/HireDate'
26 ) a;

RN EMPLOYEEID FIRSTNAME SALARY JOININGDT
---------- ---------- -------------------- ---------- ---------
1 200 Whalen 4400 17-SEP-87

scott@ORCL>
scott@ORCL>

Hope this will solve your purpose.

Also you can refer to the following XML In Oracle link.

Regards.