Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(6, 8)
total_2 = total_2 + random.randint(5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2190, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x 260
center_y: self.parent.center_y 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/255, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x 250
center_y: self.parent.center_y 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x 240
center_y: self.parent.center_y 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Streaming Data from Azure to Oracle Clouds

Hello guys,

Today, I’m going to discuss a new notion – living between multi-cloud environments; as you might be aware that I’ve shared a dynamic API built inside Azure in my last post. Today, I want to use that covid-19 API, originally from another third-party source & publish it as streams inside the Oracle cloud. These are the ideal case to use integration software like Tibco or Mulesoft. However, this is more useful for any start-up kind of environment or anyone who wants to build their API-based eco-system.

First, you need to register in Oracle cloud as they give a $300 trial to everyone who registers their platform for the first time.

Step 1:

You will lead the main dashboard after successfully registering in the portal by providing the essential information.

Registration to Oracle Cloud

Step 2:

By default, it will show the following screen –

Display of root compartment

It would be best if you created the compartment as shown below.

Creation of sub-compartment

Step 3:

Now, you can create the stream, as shown in the next step, by choosing the desired compartment. You need to click – “Create Stream” blue button on top of your page after selecting the desired compartment.

Creation of stream – Initiation
Creation of Stream – Final Steps

Also, you can test the stream by manually uploading a few sample json shown in the below step.

Lower picture show us the sample Json to test the newly created Stream – Upper picture show us the final Stream with some previously tested JSON

Step 4:

Now, we need to add API-key as follows –

Adding the API-Key

This screen will prompt you to download the private key. We’ll use this in the later configuration of our python environment.

Step 5:

Now, you need to capture the content of the configuration file shown in the below figures –

Copying content of the Configuration file

You’ll get these important details under Identity marked in red-box.

Step 6:

Now, you’ll place the previously acquired private key & the content from Step-5 under the following location from where you are going to trigger the application –

Configuration File & Private Key addition

You will get more details on this from these links –

Now, we’ve finished with the basic Oracle cloud setup.


Let’s check the Azure API one more time using postman –

Testing Azure-API from Postman

Let us install some of the critical python packages –

Installing Key Python-packages

Now, we’re ready with our environment.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsOCIConsume.py (This will consume from the designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 08-Mar-2021 ####
#### ####
#### Objective: Consuming stream from OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIConsume:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_cursor_by_partition(self, client, stream_id, partition):
print("Creating a cursor for partition {}".format(partition))
cursor_details = oci.streaming.models.CreateCursorDetails(
partition=partition,
type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
response = client.create_cursor(stream_id, cursor_details)
cursor = response.data.value
return cursor
def simple_message_loop(self, client, stream_id, initial_cursor):
try:
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=10)
# No messages to process. return.
if not get_response.data:
return
# Process the messages
print(" Read {} messages".format(len(get_response.data)))
for message in get_response.data:
print("{}: {}".format(b64decode(message.key.encode()).decode(),
b64decode(message.value.encode()).decode()))
# get_messages is a throttled method; clients should retrieve sufficiently large message
# batches, as to avoid too many http requests.
time.sleep(1)
# use the next-cursor for iteration
cursor = get_response.headers["opc-next-cursor"]
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def consumeStream(self):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Consuming sream from Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Use a cursor for getting messages; each get_messages call will return a next-cursor for iteration.
# There are a couple kinds of cursors.
# A cursor can be created at a given partition/offset.
# This gives explicit offset management control to the consumer.
print("Starting a simple message loop with a partition cursor")
partition_cursor = self.get_cursor_by_partition(stream_client, s_id, partition="0")
self.simple_message_loop(stream_client, s_id, partition_cursor)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above code –

    def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
        try:

            list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
                                               lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)

            if list_streams.data:
                # If we find an active stream with the correct name, we'll use it.
                print("An active stream {} has been found".format(stream_name))
                sid = list_streams.data[0].id
                return self.get_stream(sac_composite.client, sid)

            print(" No Active stream  {} has been found; Creating it now. ".format(stream_name))
            print(" Creating stream {} with {} partitions.".format(stream_name, partition))

            # Create stream_details object that need to be passed while creating stream.
            stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
                                                                      compartment_id=compartment, retention_in_hours=24)

            # Since stream creation is asynchronous; we need to wait for the stream to become active.
            response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
            return response
        except Exception as e:
            print(str(e))

The above function will check if there is already any existing stream available or not. If not, then it will create one.

    def get_cursor_by_partition(self, client, stream_id, partition):
        print("Creating a cursor for partition {}".format(partition))
        cursor_details = oci.streaming.models.CreateCursorDetails(
            partition=partition,
            type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
        response = client.create_cursor(stream_id, cursor_details)
        cursor = response.data.value
        return cursor

In Oracle cloud, you need to create a cursor to consume the streaming messages. Please refer to the following link for more details.

    def simple_message_loop(self, client, stream_id, initial_cursor):
        try:
            cursor = initial_cursor
            while True:
                get_response = client.get_messages(stream_id, cursor, limit=10)
                # No messages to process. return.
                if not get_response.data:
                    return

                # Process the messages
                print(" Read {} messages".format(len(get_response.data)))
                for message in get_response.data:
                    print("{}: {}".format(b64decode(message.key.encode()).decode(),
                                          b64decode(message.value.encode()).decode()))

                # get_messages is a throttled method; clients should retrieve sufficiently large message
                # batches, as to avoid too many http requests.
                time.sleep(1)
                # use the next-cursor for iteration
                cursor = get_response.headers["opc-next-cursor"]

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In this case, we’re looping through the channel & consuming the messages maintaining fewer HTTP requests in mind.

3. clsOCIPublish.py (This will publish msgs from the source Azure-API to designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Publishing stream at OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIPublish:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def publish_messages(self, client, stream_id, inputDF):
try:
limRec = self.limRec
# Converting dataframe to json
df = inputDF
# Calculating total number of records
cntRow = df.shape[0]
print('Actual Record counts: ', str(cntRow))
print('Supplied Record counts: ', str(limRec))
# Build up a PutMessagesDetails and publish some messages to the stream
message_list = []
start_pos = 0
end_pos = 0
interval = 1
for i in range(limRec):
split_df = p.DataFrame()
rJson = ''
# Preparing Data
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= cntRow):
end_pos = start_pos + interval
else:
end_pos = start_pos + (cntRow start_pos)
split_df = df.iloc[start_pos:end_pos]
rJson = split_df.to_json(orient ='records')
if ((start_pos > cntRow) | (start_pos == cntRow)):
pass
else:
start_pos = start_pos + interval
key = "key" + str(i)
value = "value" + str(rJson)
# End of data preparation
encoded_key = b64encode(key.encode()).decode()
encoded_value = b64encode(value.encode()).decode()
message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))
print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
put_message_result = client.put_messages(stream_id, messages)
# The put_message_result can contain some useful metadata for handling failures
for entry in put_message_result.data.entries:
if entry.error:
print("Error ({}) : {}".format(entry.error, entry.error_message))
else:
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def publishStream(self, inputDf):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Publishing sream to Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Publish some messages to the stream
self.publish_messages(stream_client, s_id, inputDf)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above script –

    def publish_messages(self, client, stream_id, inputDF):
        try:
            limRec = self.limRec
            # Converting dataframe to json
            df = inputDF

            # Calculating total number of records
            cntRow = df.shape[0]
            print('Actual Record counts: ', str(cntRow))
            print('Supplied Record counts: ', str(limRec))

            # Build up a PutMessagesDetails and publish some messages to the stream
            message_list = []
            start_pos = 0
            end_pos = 0
            interval = 1

            for i in range(limRec):
                split_df = p.DataFrame()
                rJson = ''
                # Preparing Data

                # Getting Individual Element & convert them to Series
                if ((start_pos + interval) <= cntRow):
                    end_pos = start_pos + interval
                else:
                    end_pos = start_pos + (cntRow - start_pos)

                split_df = df.iloc[start_pos:end_pos]
                rJson = split_df.to_json(orient ='records')

                if ((start_pos > cntRow) | (start_pos == cntRow)):
                    pass
                else:
                    start_pos = start_pos + interval

                key = "key" + str(i)
                value = "value" + str(rJson)

                # End of data preparation

                encoded_key = b64encode(key.encode()).decode()
                encoded_value = b64encode(value.encode()).decode()
                message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))

            print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
            messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
            put_message_result = client.put_messages(stream_id, messages)

            # The put_message_result can contain some useful metadata for handling failures
            for entry in put_message_result.data.entries:
                if entry.error:
                    print("Error ({}) : {}".format(entry.error, entry.error_message))
                else:
                    print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            return 1

In the above snippet, we’re fetching data captured from our Azure-API call & then send chunk-by-chunk data to the Oracle stream for publishing.

4. clsAzureAPI.py (This will fetch msgs from the source Azure-API)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
print('Input JSON: ', str(querystring))
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

I think this one is pretty straightforward as we’re invoking the Azure-API response.

5. callAzure2OracleStreaming.py (Main calling script to invoke all the class for a end to end test)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Main calling scripts – ####
#### This Python script will consume an ####
#### source API data from Azure-Cloud & publish the ####
#### data into an Oracle Streaming platform, ####
#### which is compatible with Kafka. Later, another ####
#### consumer app will read the data from the stream.####
#########################################################
from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsAzureAPI as ca
import clsOCIPublish as co
import clsOCIConsume as cc
import pandas as p
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
# Declared Variable
ret_1 = 0
debug_ind = 'Y'
res_2 = ''
# Defining Generic Log File
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Azure2OCIStream.log', level=logging.INFO)
# Initiating Log Class
l = cl.clsL()
# Moving previous day log files to archive directory
log_dir = cf.conf['LOG_PATH']
tmpR0 = "*" * 157
logging.info(tmpR0)
tmpR9 = 'Start Time: ' + str(var)
logging.info(tmpR9)
logging.info(tmpR0)
print()
print("Log Directory::", log_dir)
tmpR1 = 'Log Directory::' + log_dir
logging.info(tmpR1)
print('Welcome to Azure to Oracle Cloud Streaming(OCI) Calling Program: ')
print('*' * 160)
print('Reading dynamic Covid data from Azure API: ')
print('https://xxxxxx.yyyyyyyyyy.net/api/getDynamicCovidStats&#39;)
print()
print('Selected Columns for this -> date, state, positive, negative')
print()
print('This will take few seconds depending upon the volume & network!')
print('-' * 160)
print()
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
#res_1 = json.dumps(retJson)
#res = json.loads(res_1)
res = json.loads(retJson)
# Converting dictionary to Pandas Dataframe
# df_ret = p.read_json(ret_2, orient='records')
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[1])
# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]
print()
print()
print("-" * 160)
print('Publishing Azure sample result: ')
print(df_ret.head())
# Logging Final Output
l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')
print("-" * 160)
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Publisher Program!')
print('Pushing Azure API to Oracle Kafka-Streaming using OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x2 = co.clsOCIPublish()
retVal = x2.publishStream(df_ret)
if retVal == 0:
print('Successfully streamed to Oracle Cloud!')
else:
print('Failed to stream!')
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Consumer Program!')
print('Getting Oracle Streaming captured in OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x3 = cc.clsOCIConsume()
retVal2 = x3.consumeStream()
if retVal2 == 0:
print('Successfully streamed captured from Oracle Cloud!')
else:
print('Failed to retrieve stream from OCI!')
print('Finished Analysis points..')
print("*" * 160)
logging.info('Finished Analysis points..')
logging.info(tmpR0)
tmpR10 = 'End Time: ' + str(var)
logging.info(tmpR10)
logging.info(tmpR0)
except ValueError as e:
print(str(e))
print("Invalid option!")
logging.info("Invalid option!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()

This one is the primary calling script, invoking all the Python classes one-by-one to run our test cases together.


Let us run our application –

Running end to end application

And, you can see the streaming data inside Oracle cloud as shown below –

Streaming data

You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.