The LLM Security Chronicles – Part 5

Before we proceed with the last installment, I want you to recap our previous post, which is as follows –

Current research shows that most AI defenses fail against adaptive attacks, and no single method can reliably stop prompt injection. Adequate protection requires a layered “Swiss cheese” approach, where multiple imperfect defenses work together to reduce risk. This architecture includes input validation, semantic checks, behavioral monitoring, output sanitization, and human review. Each layer filters out increasingly dangerous content, ensuring only safe interactions pass through. Additional safeguards—such as secure prompt construction, anomaly detection, and human oversight for high-risk cases—create a more resilient system. While attackers evolve quickly, multilayered defenses offer a practical path toward stronger AI security.

Now, let us discuss some of the defensive technologies –

class AdversarialTraining:
    def __init__(self, base_model):
        self.model = base_model
        self.adversarial_generator = self.initialize_adversary()
        
    def generateAdversarialExamples(self, clean_data):
        """
        Generates adversarial training examples
        """
        adversarial_examples = []
        
        techniques = [
            self.flipAttack,
            self.poetryAttack,
            self.encodingAttack,
            self.semanticAttack,
        ]
        
        for data_point in clean_data:
            for technique in techniques:
                adversarial = technique(data_point)
                adversarial_examples.append({
                    'input': adversarial,
                    'label': 'ADVERSARIAL',
                    'technique': technique.__name__
                })
        
        return adversarial_examples
    
    def trainWithAdversarial(self, clean_data, epochs=10):
        """
        Trains model with adversarial examples
        """
        for epoch in range(epochs):
            # Generate fresh adversarial examples each epoch
            adversarial_data = self.generateAdversarialExamples(clean_data)
            
            # Combine clean and adversarial data
            combined_data = clean_data + adversarial_data
            
            # Train model to recognize and reject adversarial inputs
            self.model.train(combined_data)
            
            # Evaluate robustness
            robustness_score = self.evaluateRobustness()
            print(f"Epoch {epoch}: Robustness = {robustness_score}")

This code strengthens an AI model by training it with adversarial examples—inputs intentionally designed to confuse or mislead the system. It generates multiple types of adversarial attacks, including flipped text, encoded text, poetic prompts, and meaning-based manipulations. These examples are added to the clean training data so the model learns to detect and reject harmful inputs. During training, each epoch creates new adversarial samples, mixes them with normal data, and retrains the model. After each cycle, the system measures the improvement in the model’s robustness, helping build stronger defenses against real-world attacks.

class FormalVerification:
    def __init__(self, model):
        self.model = model
        self.properties = []
        
    def addSafetyProperty(self, property_fn):
        """
        Adds a formal safety property to verify
        """
        self.properties.append(property_fn)
    
    def verifyProperties(self, input_space):
        """
        Formally verifies safety properties
        """
        violations = []
        
        for input_sample in input_space:
            output = self.model(input_sample)
            
            for prop in self.properties:
                if not prop(input_sample, output):
                    violations.append({
                        'input': input_sample,
                        'output': output,
                        'violated_property': prop.__name__
                    })
        
        return violations
    
    def proveRobustness(self, epsilon=0.01):
        """
        Proves model robustness within epsilon-ball
        """
        # This would use formal methods like interval arithmetic
        # or abstract interpretation in production
        pass

This code provides a way to formally verify whether an AI model consistently adheres to defined safety rules. Users can add safety properties—functions that specify what “safe behavior” means. The system then tests these properties across many input samples and records any violations, showing where the model fails to behave safely. It also includes a placeholder for proving the model’s robustness within a small range of variation (an epsilon-ball), which in full implementations would rely on mathematical verification methods. Overall, it helps ensure the model meets reliability and safety standards before deployment.


timeline
title LLM Security Regulation Timeline

2024 : EU AI Act
     : California AI Safety Bill

2025 : OWASP LLM Top 10
     : NIST AI Risk Management Framework 2.0
     : UK AI Security Standards

2026 : Expected US Federal AI Security Act
     : International AI Safety Standards (ISO)

2027 : Global AI Security Accord (Proposed)
class ComplianceFramework:
    def __init__(self):
        self.regulations = {
            'EU_AI_ACT': self.loadEuRequirements(),
            'NIST_AI_RMF': self.loadNistRequirements(),
            'OWASP_LLM': self.loadOwaspRequirements(),
        }
    
    def auditCompliance(self, system):
        """
        Comprehensive compliance audit
        """
        audit_results = {}
        
        for regulation, requirements in self.regulations.items():
            results = []
            
            for requirement in requirements:
                compliant = self.checkRequirement(system, requirement)
                results.append({
                    'requirement': requirement['id'],
                    'description': requirement['description'],
                    'compliant': compliant,
                    'evidence': self.collectEvidence(system, requirement)
                })
            
            compliance_rate = sum(r['compliant'] for r in results) / len(results)
            audit_results[regulation] = {
                'compliance_rate': compliance_rate,
                'details': results
            }
        
        return audit_results

This code performs a full compliance audit to check whether an AI system meets major regulatory and security standards, including the EU AI Act, NIST’s AI Risk Management Framework, and OWASP LLM guidelines. Each regulation contains specific requirements. The framework evaluates the system against each requirement, determines whether it is compliant, and gathers evidence to support the assessment. It then calculates a compliance rate for each regulatory standard and summarizes the detailed findings. This process helps organizations verify that their AI systems follow legal, ethical, and security expectations.


class SecurityChecklist:
    def __init__(self):
        self.checklist = {
            'pre_deployment': [
                'Adversarial testing completed',
                'Security audit performed',
                'Incident response plan ready',
                'Monitoring systems active',
                'Human review process established',
            ],
            'deployment': [
                'Rate limiting enabled',
                'Input validation active',
                'Output filtering enabled',
                'Logging configured',
                'Alerting systems online',
            ],
            'post_deployment': [
                'Regular security updates',
                'Continuous monitoring',
                'Incident analysis',
                'Model retraining with adversarial examples',
                'Compliance audits',
            ]
        }
    
    def validateDeployment(self, system):
        """
        Validates system is ready for deployment
        """
        ready = True
        issues = []
        
        for phase, checks in self.checklist.items():
            for check in checks:
                if not self.verifyCheck(system, check):
                    ready = False
                    issues.append(f"{phase}: {check} - FAILED")
        
        return ready, issues

This code provides a security checklist to ensure an AI system is safe and ready at every stage of deployment. It defines required security tasks for three phases: before deployment (e.g., audits, adversarial testing, monitoring setup), during deployment (e.g., input validation, output filtering, logging, alerts), and after deployment (e.g., ongoing monitoring, updates, retraining, compliance reviews). The framework checks whether each requirement is implemented correctly. If any item fails, it reports the issue and marks the system as not ready. This ensures a thorough, structured evaluation of AI security practices.


Predicted Evolution (2026-2028):

  1. Autonomous Attack Agents: AI systems designed to find and exploit LLM vulnerabilities
  2. Supply Chain Poisoning: Targeting popular training datasets and model repositories
  3. Cross-Model Attacks: Exploits that work across multiple LLM architectures
  4. Quantum-Enhanced Attacks: Using quantum computing to break LLM defenses

The Arms Race:


For Organizations Deploying LLMs, you need to perform the following actions implemented as soon as you can –

  1. Implement basic input validation
  2. Enable comprehensive logging
  3. Set up rate limiting
  4. Create an incident response plan
  5. Train staff on AI security risks
  1. Deploy behavioral monitoring
  2. Implement output filtering
  3. Conduct security audit
  4. Establish human review process
  5. Test against known attacks
  1. Implement formal verification
  2. Deploy adversarial training
  3. Build a security operations center for AI
  4. Achieve regulatory compliance
  5. Contribute to security research
# Essential Security Metrics to Track
security_metrics = {
    'attack_detection_rate': 'Percentage of attacks detected',
    'false_positive_rate': 'Percentage of benign inputs flagged',
    'mean_time_to_detect': 'Average time to detect an attack',
    'mean_time_to_respond': 'Average time to respond to incident',
    'bypass_rate': 'Percentage of attacks that succeed',
    'coverage': 'Percentage of attack vectors covered by defenses',
}

# Key Performance Indicators (KPIs)
target_kpis = {
    'attack_detection_rate': '>95%',
    'false_positive_rate': '<5%',
    'mean_time_to_detect': '<1 second',
    'mean_time_to_respond': '<5 minutes',
    'bypass_rate': '<10%',
    'coverage': '>90%',
}

Despite the dire statistics, there are reasons to be hopeful –

  1. Increased Awareness: The security community is taking LLM threats seriously
  2. Research Investment: Major tech companies are funding defensive research
  3. Regulatory Pressure: Governments are mandating security standards
  4. Community Collaboration: Unprecedented cooperation between competitors on security
  5. Technical Progress: New defensive techniques show promise

But, challenges remain –

  1. Asymmetric Advantage: Attackers need one success; defenders need perfect protection
  2. Rapid Evolution: Attack techniques evolving faster than defenses
  3. Democratization of Attacks: Tools like WormGPT make attacks accessible
  4. Limited Understanding: We still don’t fully understand how LLMs work
  5. Resource Constraints: Security often remains underfunded

As we conclude this three-part journey through the wilderness of LLM security, remember that this isn’t an ending—it’s barely the beginning. We’re in the “Netscape Navigator” era of AI security, where everything is held together with digital duct tape and good intentions.

The battle between LLM attackers and defenders is like an infinite game of whack-a-mole, except the moles are getting PhDs and the hammer is made of hopes and prayers. But here’s the thing: every great technology goes through this phase. The internet was a security disaster until it wasn’t (okay, it still is, but it’s a manageable disaster).

I think – LLM security in 2025 is where cybersecurity was in 1995—critical, underdeveloped, and about to become everyone’s problem. The difference is we have 30 years of security lessons to apply, if we’re smart enough to use them.

Remember: In the grand chess game of AI security, we’re currently playing checkers while attackers are playing 4D chess. But every grandmaster started as a beginner, and every secure system started as a vulnerable one.

Stay vigilant, stay updated, and maybe keep a backup plan that doesn’t involve AI. Just in case the machines decide to take a sick day… or take over the world.

So, with this I conclude this series, where I discuss the types of attacks, vulnerabilities & the defensive mechanism of LLM-driven solutions in the field of Enterprise-level architecture.

I hope you all like this effort & let me know your feedback. I’ll be back with another topic. Until then, Happy Avenging! 🙂

Building a real-time streamlit app by consuming events from Ably channels

I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.

One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.

However, I would like to share the run before we dig deep into this.


Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.

Let’s explore the broad-level architecture/flow –

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.

Let us understand the sample packages that are required for this task.

pip install ably==2.0.3
pip install numpy==1.26.3
pip install pandas==2.2.0
pip install plotly==5.19.0
pip install requests==2.31.0
pip install streamlit==1.30.0
pip install streamlit-autorefresh==1.0.1
pip install streamlit-echarts==0.4.0

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.

1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)

Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

def createHumidityGauge(humidity_value):
    fig = go.Figure(go.Indicator(
        mode = "gauge+number",
        value = humidity_value,
        domain = {'x': [0, 1], 'y': [0, 1]},
        title = {'text': "Humidity", 'font': {'size': 24}},
        gauge = {
            'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
            'bar': {'color': "darkblue"},
            'bgcolor': "white",
            'borderwidth': 2,
            'bordercolor': "gray",
            'steps': [
                {'range': [0, 50], 'color': 'cyan'},
                {'range': [50, 100], 'color': 'royalblue'}],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': humidity_value}
        }
    ))

    fig.update_layout(height=220, paper_bgcolor = "white", font = {'color': "darkblue", 'family': "Arial"}, margin=dict(t=0, l=5, r=5, b=0))

    return fig

The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.

This code defines a function createHumidityGauge that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:

  1. Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
  2. Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
  3. Setting Gauge Properties:
    • The value is set to the humidity_value parameter, so the gauge shows this humidity level.
    • The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
    • The title is set to “Humidity” with a font size of 24, labeling the gauge.
    • The gauge section defines the appearance and behavior of the gauge, including:
      • An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
      • The color and style of the gauge’s bar and background.
      • Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
      • A threshold line that appears at the value of the humidity, marked in red to stand out.
  4. Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
  5. Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.

Other similar functions will repeat the same steps.

def createTemperatureLineChart(data):
    # Assuming 'data' is a DataFrame with a 'Timestamp' index and a 'Temperature' column
    fig = px.line(data, x=data.index, y='Temperature', title='Temperature Vs Time')
    fig.update_layout(height=270)  # Specify the desired height here
    return fig

The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.

This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:

  1. Function Definition: It starts with defining a function named createTemperatureLineChart that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
  2. Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
    • The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
    • The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
    • The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
  3. Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
  4. Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.

Similar functions will repeat for other KPIs.

    st.sidebar.header("KPIs")
    selected_kpis = st.sidebar.multiselect(
        "Select KPIs", options=["Temperature", "Humidity", "Pressure"], default=["Temperature"]
    )

The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).

# Split the layout into columns for KPIs and graphs
    gauge_col, kpi_col, graph_col = st.columns(3)

    # Auto-refresh setup
    st_autorefresh(interval=7000, key='data_refresh')

    # Fetching real-time data
    data = getData(var1, DInd)

    st.markdown(
        """
        <style>
        .stEcharts { margin-bottom: -50px; }  /* Class might differ, inspect the HTML to find the correct class name */
        </style>
        """,
        unsafe_allow_html=True
    )

    # Display gauges at the top of the page
    gauges = st.container()

    with gauges:
        col1, col2, col3 = st.columns(3)
        with col1:
            humidity_value = round(data['Humidity'].iloc[-1], 2)
            humidity_gauge_fig = createHumidityGauge(humidity_value)
            st.plotly_chart(humidity_gauge_fig, use_container_width=True)

        with col2:
            temp_value = round(data['Temperature'].iloc[-1], 2)
            temp_gauge_fig = createTempGauge(temp_value)
            st.plotly_chart(temp_gauge_fig, use_container_width=True)

        with col3:
            pressure_value = round(data['Pressure'].iloc[-1], 2)
            pressure_gauge_fig = createPressureGauge(pressure_value)
            st.plotly_chart(pressure_gauge_fig, use_container_width=True)


    # Next row for actual readings and charts side-by-side
    readings_charts = st.container()


    # Display KPIs and their trends
    with readings_charts:
        readings_col, graph_col = st.columns([1, 2])

        with readings_col:
            st.subheader("Latest Readings")
            if "Temperature" in selected_kpis:
                st.metric("Temperature", f"{temp_value:.2f}%")

            if "Humidity" in selected_kpis:
                st.metric("Humidity", f"{humidity_value:.2f}%")

            if "Pressure" in selected_kpis:
                st.metric("Pressure", f"{pressure_value:.2f}%")


        # Graph placeholders for each KPI
        with graph_col:
            if "Temperature" in selected_kpis:
                temperature_fig = createTemperatureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(temperature_fig, use_container_width=True)

            if "Humidity" in selected_kpis:
                humidity_fig = createHumidityLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(humidity_fig, use_container_width=True)

            if "Pressure" in selected_kpis:
                pressure_fig = createPressureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(pressure_fig, use_container_width=True)
  1. The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
  2. It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
  3. Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
  4. A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
  5. A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
  6. Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
  7. Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
  8. The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
  9. In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
  10. This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.

Let us understand some of the important screenshots of this application –


So, we’ve done it.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Neural prophet – The enhanced version of Facebook’s forecasting API

Hi Team,

Today, I’ll be explaining the enhancement of one of the previous posts. I know that I’ve shared the fascinating API named prophet-API, which Facebook developed. One can quickly get more accurate predictions with significantly fewer data points. (If you want to know more about that post, please click on the following link.)

However, there is another enhancement on top of that API, which is more accurate. However, one needs to know – when they should consider using it. So, today, we’ll be talking about the neural prophet API.

But, before we start digging deep, why don’t we view the demo first?

Demo

Let’s visit a diagram. That way, you can understand where you can use it. Also, I’ll be sharing some of the links from the original site for better information mining.

Source: Neural Prophet (Official Site)

As one can see, this API is trying to bridge between the different groups & it enables the time-series computation efficiently.

WHERE TO USE:

Let’s visit another diagram from the same source.

Source: Neural Prophet (Official Site)

So, I hope these two pictures give you a clear picture & relatively set your expectations to more ground reality.


ARCHITECTURE:

Let us explore the architecture –

Architecture Diagram

As one can see, the application is processing IoT data & creating a historical data volume, out of which the model is gradually predicting correct outcomes with higher confidence.

For more information on this API, please visit the following link.


CODE:

Let’s explore the essential scripts here.

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
import pandas as p
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'thermostatIoT.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'event1': {
'event': 'SummerEnd',
'ds': p.to_datetime([
'2010-04-01', '2011-04-01', '2012-04-01',
'2013-04-01', '2014-04-01', '2015-04-01',
'2016-04-01', '2017-04-01', '2018-04-01',
'2019-04-01', '2020-04-01', '2021-04-01',
]),},
'event2': {
'event': 'LongWeekend',
'ds': p.to_datetime([
'2010-12-01', '2011-12-01', '2012-12-01',
'2013-12-01', '2014-12-01', '2015-12-01',
'2016-12-01', '2017-12-01', '2018-12-01',
'2019-12-01', '2020-12-01', '2021-12-01',
]),}
}

view raw

clsConfig.py

hosted with ❤ by GitHub

The only key snippet would be passing a nested json element with pandas dataframe in the following lines –

'event1': {
    'event': 'SummerEnd',
    'ds': p.to_datetime([
        '2010-04-01', '2011-04-01', '2012-04-01',
        '2013-04-01', '2014-04-01', '2015-04-01',
        '2016-04-01', '2017-04-01', '2018-04-01',
        '2019-04-01', '2020-04-01', '2021-04-01',
    ]),},
'event2': {
    'event': 'LongWeekend',
    'ds': p.to_datetime([
        '2010-12-01', '2011-12-01', '2012-12-01',
        '2013-12-01', '2014-12-01', '2015-12-01',
        '2016-12-01', '2017-12-01', '2018-12-01',
        '2019-12-01', '2020-12-01', '2021-12-01',
    ]),}

As one can see, our application is equipped with the events to predict our use case better.

2. clsPredictIonIoT.py (Main class file, which will invoke neural-prophet forecast for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 19-Feb-2022 ####
#### Modified On 21-Feb-2022 ####
#### ####
#### Objective: This python script will ####
#### perform the neural-prophet forecast ####
#### based on the historical input received ####
#### from IoT device. ####
################################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import psutil
import os
import pandas as p
import json
import datetime
from neuralprophet import NeuralProphet, set_log_level
from neuralprophet import set_random_seed
from neuralprophet.benchmark import Dataset, NeuralProphetModel, SimpleExperiment, CrossValidationExperiment
import time
import clsL as cl
import matplotlib.pyplot as plt
###############################################
### Global Section ###
###############################################
# Initiating Log class
l = cl.clsL()
set_random_seed(10)
set_log_level("ERROR", "INFO")
###############################################
### End of Global Section ###
###############################################
class clsPredictIonIoT:
def __init__(self):
self.sleepTime = int(cf.conf['sleepTime'])
self.event1 = cf.conf['event1']
self.event2 = cf.conf['event2']
def forecastSeries(self, inputDf):
try:
sleepTime = self.sleepTime
event1 = self.event1
event2 = self.event2
df = inputDf
print('IoTData: ')
print(df)
## user specified events
# history events
SummerEnd = p.DataFrame(event1)
LongWeekend = p.DataFrame(event2)
dfEvents = p.concat((SummerEnd, LongWeekend))
# NeuralProphet Object
# Adding events
m = NeuralProphet(loss_func="MSE")
# set the model to expect these events
m = m.add_events(["SummerEnd", "LongWeekend"])
# create the data df with events
historyDf = m.create_df_with_events(df, dfEvents)
# fit the model
metrics = m.fit(historyDf, freq="D")
# forecast with events known ahead
futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
forecastDf = m.predict(df=futureDf)
events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
events.tail()
## plotting forecasts
fig = m.plot(forecastDf)
## plotting components
figComp = m.plot_components(forecastDf)
## plotting parameters
figParam = m.plot_parameters()
#################################
#### Train & Test Evaluation ####
#################################
m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
metricsTrain = m.fit(df=dfTrain, freq="MS")
metricsTest = m.test(df=dfTest)
print('metricsTest:: ')
print(metricsTest)
# Predict Into Future
metricsTrain2 = m.fit(df=df, freq="MS")
futureDf = m.make_future_dataframe(df, periods=24, n_historic_predictions=48)
forecastDf = m.predict(futureDf)
fig = m.plot(forecastDf)
# Visualize training
m = NeuralProphet(seasonality_mode="multiplicative", learning_rate=0.1)
dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
metrics = m.fit(df=dfTrain, freq="MS", validation_df=dfTest, plot_live_loss=True)
print('Tail of Metrics: ')
print(metrics.tail(1))
######################################
#### Time-series Cross-Validation ####
######################################
METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
folds = NeuralProphet(**params).crossvalidation_split_df(df, freq="MS", k=5, fold_pct=0.20, fold_overlap_pct=0.5)
metricsTrain = p.DataFrame(columns=METRICS)
metricsTest = p.DataFrame(columns=METRICS)
for dfTrain, dfTest in folds:
m = NeuralProphet(**params)
train = m.fit(df=dfTrain, freq="MS")
test = m.test(df=dfTest)
metricsTrain = metricsTrain.append(train[METRICS].iloc[-1])
metricsTest = metricsTest.append(test[METRICS].iloc[-1])
print('Stats: ')
dfStats = metricsTest.describe().loc[["mean", "std", "min", "max"]]
print(dfStats)
####################################
#### Using Benchmark Framework ####
####################################
print('Starting extracting result set for Benchmark:')
ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
params = {"seasonality_mode": "multiplicative"}
exp = SimpleExperiment(
model_class=NeuralProphetModel,
params=params,
data=ts,
metrics=["MASE", "RMSE"],
test_percentage=25,
)
resultTrain, resultTest = exp.run()
print('Test result for Benchmark:: ')
print(resultTest)
print('Finished extracting result test for Benchmark!')
####################################
#### Cross Validate Experiment ####
####################################
print('Starting extracting result set for Corss-Validation:')
ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
params = {"seasonality_mode": "multiplicative"}
exp_cv = CrossValidationExperiment(
model_class=NeuralProphetModel,
params=params,
data=ts,
metrics=["MASE", "RMSE"],
test_percentage=10,
num_folds=3,
fold_overlap_pct=0,
)
resultTrain, resultTest = exp_cv.run()
print('resultTest for Cross Validation:: ')
print(resultTest)
print('Finished extracting result test for Corss-Validation!')
######################################################
#### 3-Phase Train, Test & Validation Experiment ####
######################################################
print('Starting 3-phase Train, Test & Validation Experiment!')
m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
# create a test holdout set:
dfTrainVal, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
# create a validation holdout set:
dfTrain, dfVal = m.split_df(df=dfTrainVal, freq="MS", valid_p=0.2)
# fit a model on training data and evaluate on validation set.
metricsTrain1 = m.fit(df=dfTrain, freq="MS")
metrics_val = m.test(df=dfVal)
# refit model on training and validation data and evaluate on test set.
metricsTrain2 = m.fit(df=dfTrainVal, freq="MS")
metricsTest = m.test(df=dfTest)
metricsTrain1["split"] = "train1"
metricsTrain2["split"] = "train2"
metrics_val["split"] = "validate"
metricsTest["split"] = "test"
metrics_stat = metricsTrain1.tail(1).append([metricsTrain2.tail(1), metrics_val, metricsTest]).drop(columns=['RegLoss'])
print('Metrics Stat:: ')
print(metrics_stat)
# Train, Cross-Validate and Cross-Test evaluation
METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
crossVal, crossTest = NeuralProphet(**params).double_crossvalidation_split_df(df, freq="MS", k=5, valid_pct=0.10, test_pct=0.10)
metricsTrain1 = p.DataFrame(columns=METRICS)
metrics_val = p.DataFrame(columns=METRICS)
for dfTrain1, dfVal in crossVal:
m = NeuralProphet(**params)
train1 = m.fit(df=dfTrain, freq="MS")
val = m.test(df=dfVal)
metricsTrain1 = metricsTrain1.append(train1[METRICS].iloc[-1])
metrics_val = metrics_val.append(val[METRICS].iloc[-1])
metricsTrain2 = p.DataFrame(columns=METRICS)
metricsTest = p.DataFrame(columns=METRICS)
for dfTrain2, dfTest in crossTest:
m = NeuralProphet(**params)
train2 = m.fit(df=dfTrain2, freq="MS")
test = m.test(df=dfTest)
metricsTrain2 = metricsTrain2.append(train2[METRICS].iloc[-1])
metricsTest = metricsTest.append(test[METRICS].iloc[-1])
mtrain2 = metricsTrain2.describe().loc[["mean", "std"]]
print('Train 2 Stats:: ')
print(mtrain2)
mval = metrics_val.describe().loc[["mean", "std"]]
print('Validation Stats:: ')
print(mval)
mtest = metricsTest.describe().loc[["mean", "std"]]
print('Test Stats:: ')
print(mtest)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets that I will discuss here are as follows –

## user specified events
# history events
SummerEnd = p.DataFrame(event1)
LongWeekend = p.DataFrame(event2)

dfEvents = p.concat((SummerEnd, LongWeekend))

# NeuralProphet Object
# Adding events
m = NeuralProphet(loss_func="MSE")

# set the model to expect these events
m = m.add_events(["SummerEnd", "LongWeekend"])

# create the data df with events
historyDf = m.create_df_with_events(df, dfEvents)

Creating & adding events into your model will allow it to predict based on the milestones.

# fit the model
metrics = m.fit(historyDf, freq="D")

# forecast with events known ahead
futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
forecastDf = m.predict(df=futureDf)

events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
events.tail()

## plotting forecasts
fig = m.plot(forecastDf)

## plotting components
figComp = m.plot_components(forecastDf)

## plotting parameters
figParam = m.plot_parameters()

Based on the daily/monthly collected data, our algorithm tries to plot the data points & predict a future trend, which will look like this –

Future Data Points

From the above diagram, we can conclude that the CPU’s trend has been growing day by day since the beginning. However, there are some events when we can see a momentary drop in requirements due to the climate & holidays. During those times, either people are not using them or are not at home.

Apart from that, I’ve demonstrated the use of a benchwork framework, & splitting the data into Train, Test & Validation & captured the RMSE values. I would request you to go through that & post any questions if you have any.

You can witness the train & validation datasets & visualize them in the standard manner, which will look something like –

Demo

3. readingIoT.py (Main invoking script.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 21-Feb-2022 ####
#### Modified On 21-Feb-2022 ####
#### ####
#### Objective: This python script will ####
#### invoke the main class to use the ####
#### stored historical IoT data stored & ####
#### then transform, cleanse, predict & ####
#### analyze the data points into more ####
#### meaningful decision-making insights. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import pandas as p
import clsPredictIonIoT as cpt
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
# Initiating Prediction class
x1 = cpt.clsPredictIonIoT()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoT_NeuralProphet.log', level=logging.INFO)
# Reading the source IoT data
iotData = p.read_csv(fileName)
df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]
r1 = x1.forecastSeries(df)
if (r1 == 0):
print('Successfully IoT forecast predicted!')
else:
print('Failed to predict IoT forecast!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

readingIoT.py

hosted with ❤ by GitHub

Here are some of the key snippets –

# Reading the source IoT data
iotData = p.read_csv(fileName)
df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]

r1 = x1.forecastSeries(df)

if (r1 == 0):
    print('Successfully IoT forecast predicted!')
else:
    print('Failed to predict IoT forecast!')

var2 = datetime.datetime.now()

In those above lines, the main calling application is invoking the neural-forecasting class & passing the pandas dataframe containing IoT’s historical data to train its model.

For your information, here is the outcome of the run, when you invoke the main calling script –

Demo – Continue

FOLDER STRUCTURE:

Please find the folder structure as shown –

Directory Structure

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(-6, 8)
total_2 = total_2 + random.randint(-5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = -1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com&quot;
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height – 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2-190, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x – 260
center_y: self.parent.center_y – 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/2-55, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x – 250
center_y: self.parent.center_y – 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x – 240
center_y: self.parent.center_y – 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Projecting real-time KPIs by ingesting streaming events from emulated IoT-device

Today, I am planning to demonstrate an IoT use case implemented in Python. I was waiting for my Raspberry Pi to arrive. However, the product that I received was not working as expected. Perhaps, some hardware malfunction. Hence, I was looking for a way to continue with my installment even without the hardware.

I was looking for an alternative way to use an online Raspberry Pi emulator. Recently, Microsoft has introduced integrated Raspberry Pi, which you can directly integrate with Azure IoT. However, I couldn’t find any API, which I could leverage on my Python application.

So, I explored all the possible options & finally come-up with the idea of creating my own IoT-Emulator, which can integrate with any application. With the help from the online materials, I have customized & enhanced them as per my use case & finally come up with this clean application that will demonstrate this use case with clarity.

We’ll showcase this real-time use case, where we would try to capture the events generated by IoT in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes.


However, I would like to share the run before we dig deep into this.

Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & captures real-time events to Ably Queue, then transform those raw events into more meaningful KPIs. Let’s deep dive then.


Architecture:

Let’s explore the architecture –

Fig – 1

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, Dashboard consumes the events & transforms them into more meaningful metrics.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installation

Step – 2:

Installation – Continue

And, here is the command to install those packages –

pip install dash==1.0.0
pip install numpy==1.16.4
pip install pandas==0.24.2
pip install scipy==1.3.0
pip install gunicorn==19.9.0
pip install ably==1.1.1
pip install tkgpio==0.1

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py (This native Python script contains the configuration entries.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 25-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'JSONFileNameWithPath': Curr_Path + sep + 'GUI_Config' + sep + 'CircuitConfiguration.json',
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 50,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID, FinData & JSONFileNameWithPath.

2. clsPublishStream.py (This script will publish real-time streaming data coming out from a hosted API sources using another popular third-party service named Ably. Ably mimics pubsub Streaming concept, which might be extremely useful for any start-ups.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.msgSize = cf.conf['limRec']
def pushEvents(self, srcJSON, debugInd, varVa):
try:
msgSize = self.msgSize
# Capturing the inbound dataframe
jdata_fin = json.dumps(srcJSON)
print('IOT Events: ')
print(str(jdata_fin))
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’re not going to discuss this as we’ve already discussed in my previous post.

3. clsStreamConsume.py (Consuming Streaming data from Ably channels.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### playIOTDevice.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
#jdata = json.dumps(json_data)
# Converting String to Dictionary
dict_json = eval(json_data)
# Converting JSON to Dataframe
#df = p.json_normalize(json_data)
#df.columns = df.columns.map(lambda x: x.split(".")[-1])
df = p.DataFrame.from_dict(dict_json, orient='index')
#print('DF Inside:')
#print(df)
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print('Error: ', x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’re not going to discuss this as we’ve already discussed in my previous post.

4. CircuitConfiguration.json (Configuration file for GUI Interface for IoT Simulator.)


{
"name":"Analog Device",
"width":700,
"height":350,
"leds":[
{
"x":105,
"y":80,
"name":"LED",
"pin":21
}
],
"motors":[
{
"x":316,
"y":80,
"name":"DC Motor",
"forward_pin":22,
"backward_pin":23
}
],
"servos":[
{
"x":537,
"y":80,
"name":"Servo Motor",
"pin":24,
"min_angle":-180,
"max_angle":180,
"initial_angle":20
}
],
"adc":{
"mcp_chip":3008,
"potenciometers":[
{
"x":40,
"y":200,
"name":"Brightness Potentiometer",
"channel":0
},
{
"x":270,
"y":200,
"name":"Speed Potentiometer",
"channel":2
},
{
"x":500,
"y":200,
"name":"Angle Potentiometer",
"channel":6
}
]
},
"toggles":[
{
"x":270,
"y":270,
"name":"Direction Toggle Switch",
"pin":15,
"off_label":"backward",
"on_label":"forward",
"is_on":false
}
],
"labels":[
{
"x":15,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":56,
"y":26,
"text":"Brightness Control"
},
{
"x":245,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":298,
"y":26,
"text":"Speed Control"
},
{
"x":475,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":531,
"y":26,
"text":"Angle Control"
}
]
}

This json configuration will be used by the next python class.

5. clsBuildCircuit.py (Calling Tk Circuit API.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Calling Tk Circuit API ####
##############################################
from tkgpio import TkCircuit
from json import load
from clsConfig import clsConfig as cf
fileName = str(cf.conf['JSONFileNameWithPath'])
print('File Name: ', str(fileName))
# initialize the circuit inside the GUI
with open(fileName, "r") as file:
config = load(file)
class clsBuildCircuit:
def __init__(self):
self.config = config
def genCir(self, main_function):
try:
config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)
return circuit
except Exception as e:
x = str(e)
print(x)
return ''

Key snippets from the above script –

config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)

The above lines will create an instance of simulated IoT circuits & then it will use the json file to start the GUI class.

6. playIOTDevice.py (Main Circuit GUI script to create an IoT Device to generate the events, which will consumed.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Main Tk Circuit GUI script ####
#### to create an IOT Device to generate ####
#### the events, which will consumed. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsBuildCircuit as csb
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Create the instance of the Tk Circuit API Class.
circuit = csb.clsBuildCircuit()
###############################################
### End of Global Section ###
###############################################
# Invoking the IOT Device Generator.
@circuit.genCir
def main():
from gpiozero import PWMLED, Motor, Servo, MCP3008, Button
from time import sleep
# Circuit Components
ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)
ioMeter1 = MCP3008(0)
ioMeter2 = MCP3008(2)
ioMeter3 = MCP3008(6)
switch = Button(15)
# End of circuit components
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IOTDevice.log', level=logging.INFO)
while True:
ledAlert.value = ioMeter1.value
if switch.is_pressed:
dcMotor.forward(ioMeter2.value)
xVal = 'Motor Forward'
else:
dcMotor.backward(ioMeter2.value)
xVal = 'Motor Backward'
servoMotor.value = 1 – 2 * ioMeter3.value
srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IOT event pushed!')
else:
print('Failed to push IOT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = -1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
sleep(0.05)

Lets’ explore the key snippets –

ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)

It defines three motors that include Servo, DC & LED.

Now, we can see the following sets of the critical snippet –

ledAlert.value = ioMeter1.value

if switch.is_pressed:
    dcMotor.forward(ioMeter2.value)
    xVal = 'Motor Forward'
else:
    dcMotor.backward(ioMeter2.value)
    xVal = 'Motor Backward'

servoMotor.value = 1 - 2 * ioMeter3.value

srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}

Following lines will dynamically generates JSON that will be passed into the Ably queue –

tmpJson = str(srcJson)

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

Final line from the above script –

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

This code will now push the events into the Ably Queue.

7. app.py (Consuming Streaming data from Ably channels & captured IOT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 02-Oct-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IOT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import os
import pathlib
import numpy as np
import datetime as dt
import dash
from dash import dcc
from dash import html
import datetime
import dash_daq as daq
from dash.exceptions import PreventUpdate
from dash.dependencies import Input, Output, State
from scipy.stats import rayleigh
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
GRAPH_INTERVAL = os.environ.get("GRAPH_INTERVAL", 5000)
app = dash.Dash(
__name__,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}],
)
app.title = "IOT Device Dashboard"
server = app.server
app_color = {"graph_bg": "#082255", "graph_line": "#007ACE"}
app.layout = html.Div(
[
# header
html.Div(
[
html.Div(
[
html.H4("IOT DEVICE STREAMING", className="app__header__title"),
html.P(
"This app continually consumes streaming data from IOT-Device and displays live charts of various metrics & KPI associated with it.",
className="app__header__title–grey",
),
],
className="app__header__desc",
),
html.Div(
[
html.A(
html.Button("SOURCE CODE", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream&quot;,
),
html.A(
html.Button("VIEW DEMO", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream/blob/main/demo.gif&quot;,
),
html.A(
html.Img(
src=app.get_asset_url("dash-new-logo.png"),
className="app__menu__img",
),
href="https://plotly.com/dash/&quot;,
),
],
className="app__header__logo",
),
],
className="app__header",
),
html.Div(
[
# Motor Speed
html.Div(
[
html.Div(
[html.H6("SERVO METER (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
),
# Second Panel
html.Div(
[html.H6("DC-MOTOR (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure-1",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update-1",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
)
],
className="two-thirds column motor__speed__container",
),
html.Div(
[
# histogram
html.Div(
[
html.Div(
[
html.H6(
"MOTOR POWER HISTOGRAM",
className="graph__title",
)
]
),
html.Div(
[
dcc.Slider(
id="bin-slider",
min=1,
max=60,
step=1,
value=20,
updatemode="drag",
marks={
20: {"label": "20"},
40: {"label": "40"},
60: {"label": "60"},
},
)
],
className="slider",
),
html.Div(
[
dcc.Checklist(
id="bin-auto",
options=[
{"label": "Auto", "value": "Auto"}
],
value=["Auto"],
inputClassName="auto__checkbox",
labelClassName="auto__label",
),
html.P(
"# of Bins: Auto",
id="bin-size",
className="auto__p",
),
],
className="auto__container",
),
dcc.Graph(
id="motor-histogram",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container first",
),
# motor direction
html.Div(
[
html.Div(
[
html.H6(
"SERVO MOTOR DIRECTION", className="graph__title"
)
]
),
dcc.Graph(
id="servo-motor-direction",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container second",
),
],
className="one-third column histogram__direction",
),
],
className="app__content",
),
],
className="app__container",
)
def toPositive(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMotor']))
elif flag == 'DCMotor':
x_val = abs(float(row['DCMotor'])) * 0.001
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def toPositiveInflated(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMeter'])) * 100
elif flag == 'DCMotor':
x_val = abs(float(row['DCMeter'])) * 100
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def getData(var, Ind):
try:
# Let's pass this to our map section
df = x1.conStream(var, Ind)
df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)
# Dropping old columns
df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)
#Rename New Columns to Old Columns
df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
@app.callback(
Output("iot-measure-1", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the DC Meter graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["DCMotor"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["DCMeter"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["DCMotor"])),
max(100, max(df["DCMotor"]) + max(df["DCMeter"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["DCMotor"].iloc[-1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the Motor Speed graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["ServoMeter"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["ServoMotor"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["ServoMeter"])),
max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("servo-motor-direction", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_motor_direction(interval):
"""
Generate the Servo direction graph.
:params interval: update the graph based on an interval
"""
df = getData(var1, DInd)
val = df["ServoMeter"].iloc[-1]
direction = [0, (df["ServoMeter"][0]*100 – 20), (df["ServoMeter"][0]*100 + 20), 0]
traces_scatterpolar = [
{"r": [0, val, val, 0], "fillcolor": "#084E8A"},
{"r": [0, val * 0.65, val * 0.65, 0], "fillcolor": "#B4E1FA"},
{"r": [0, val * 0.3, val * 0.3, 0], "fillcolor": "#EBF5FA"},
]
data = [
dict(
type="scatterpolar",
r=traces["r"],
theta=direction,
mode="lines",
fill="toself",
fillcolor=traces["fillcolor"],
line={"color": "rgba(32, 32, 32, .6)", "width": 1},
)
for traces in traces_scatterpolar
]
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
autosize=False,
polar={
"bgcolor": app_color["graph_line"],
"radialaxis": {"range": [0, 45], "angle": 45, "dtick": 10},
"angularaxis": {"showline": False, "tickcolor": "white"},
},
showlegend=False,
)
return dict(data=data, layout=layout)
@app.callback(
Output("motor-histogram", "figure"),
[Input("iot-measure-update", "n_intervals")],
[
State("iot-measure", "figure"),
State("bin-slider", "value"),
State("bin-auto", "value"),
],
)
def gen_motor_histogram(interval, iot_speed_figure, slider_value, auto_state):
"""
Genererate iot histogram graph.
:params interval: upadte the graph based on an interval
:params iot_speed_figure: current Motor Speed graph
:params slider_value: current slider value
:params auto_state: current auto state
"""
motor_val = []
try:
print('Inside gen_motor_histogram:')
print('iot_speed_figure::')
print(iot_speed_figure)
# Check to see whether iot-measure has been plotted yet
if iot_speed_figure is not None:
motor_val = iot_speed_figure["data"][0]["y"]
if "Auto" in auto_state:
bin_val = np.histogram(
motor_val,
bins=range(int(round(min(motor_val))), int(round(max(motor_val)))),
)
else:
bin_val = np.histogram(motor_val, bins=slider_value)
except Exception as error:
raise PreventUpdate
avg_val = float(sum(motor_val)) / len(motor_val)
median_val = np.median(motor_val)
pdf_fitted = rayleigh.pdf(
bin_val[1], loc=(avg_val) * 0.55, scale=(bin_val[1][-1] – bin_val[1][0]) / 3
)
y_val = (pdf_fitted * max(bin_val[0]) * 20,)
y_val_max = max(y_val[0])
bin_val_max = max(bin_val[0])
trace = dict(
type="bar",
x=bin_val[1],
y=bin_val[0],
marker={"color": app_color["graph_line"]},
showlegend=False,
hoverinfo="x+y",
)
traces_scatter = [
{"line_dash": "dash", "line_color": "#2E5266", "name": "Average"},
{"line_dash": "dot", "line_color": "#BD9391", "name": "Median"},
]
scatter_data = [
dict(
type="scatter",
x=[bin_val[int(len(bin_val) / 2)]],
y=[0],
mode="lines",
line={"dash": traces["line_dash"], "color": traces["line_color"]},
marker={"opacity": 0},
visible=True,
name=traces["name"],
)
for traces in traces_scatter
]
trace3 = dict(
type="scatter",
mode="lines",
line={"color": "#42C4F7"},
y=y_val[0],
x=bin_val[1][: len(bin_val[1])],
name="Rayleigh Fit",
)
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
xaxis={
"title": "Motor Power",
"showgrid": False,
"showline": False,
"fixedrange": True,
},
yaxis={
"showgrid": False,
"showline": False,
"zeroline": False,
"title": "Number of Samples",
"fixedrange": True,
},
autosize=True,
bargap=0.01,
bargroupgap=0,
hovermode="closest",
legend={
"orientation": "h",
"yanchor": "bottom",
"xanchor": "center",
"y": 1,
"x": 0.5,
},
shapes=[
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": avg_val,
"x1": avg_val,
"type": "line",
"line": {"dash": "dash", "color": "#2E5266", "width": 5},
},
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": median_val,
"x1": median_val,
"type": "line",
"line": {"dash": "dot", "color": "#BD9391", "width": 5},
},
],
)
return dict(data=[trace, scatter_data[0], scatter_data[1], trace3], layout=layout)
@app.callback(
Output("bin-auto", "value"),
[Input("bin-slider", "value")],
[State("iot-measure", "figure")],
)
def deselect_auto(slider_value, iot_speed_figure):
""" Toggle the auto checkbox. """
# prevent update if graph has no data
if "data" not in iot_speed_figure:
raise PreventUpdate
if not len(iot_speed_figure["data"]):
raise PreventUpdate
if iot_speed_figure is not None and len(iot_speed_figure["data"][0]["y"]) > 5:
return [""]
return ["Auto"]
@app.callback(
Output("bin-size", "children"),
[Input("bin-auto", "value")],
[State("bin-slider", "value")],
)
def show_num_bins(autoValue, slider_value):
""" Display the number of bins. """
if "Auto" in autoValue:
return "# of Bins: Auto"
return "# of Bins: " + str(int(slider_value))
if __name__ == "__main__":
app.run_server(debug=True)

view raw

app.py

hosted with ❤ by GitHub

Here are the key snippets –

html.Div(
        [
            html.Div(
                [html.H6("SERVO METER (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            ),
            # Second Panel
            html.Div(
                [html.H6("DC-MOTOR (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure-1",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update-1",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            )
        ],
        className="two-thirds column motor__speed__container",

The following line creates two panels, where the application will consume the streaming data by the app’s call-back feature & refresh the data & graphs as & when the application receives the streaming data.

A similar approach was adopted for other vital aspects/components inside the dashboard.

def getData(var, Ind):
    try:
        # Let's pass this to our map section
        df = x1.conStream(var, Ind)

        df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
        df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
        df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
        df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)

        # Dropping old columns
        df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)

        #Rename New Columns to Old Columns
        df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
        df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
        df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The application is extracting streaming data & consuming it from the Ably queue.

@app.callback(
    Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
    """
    Generate the Motor Speed graph.

    :params interval: update the graph based on an interval
    """

    # Let's pass this to our map section
    df = getData(var1, DInd)

    trace = dict(
        type="scatter",
        y=df["ServoMeter"],
        line={"color": "#42C4F7"},
        hoverinfo="skip",
        error_y={
            "type": "data",
            "array": df["ServoMotor"],
            "thickness": 1.5,
            "width": 2,
            "color": "#B4E8FC",
        },
        mode="lines",
    )

    layout = dict(
        plot_bgcolor=app_color["graph_bg"],
        paper_bgcolor=app_color["graph_bg"],
        font={"color": "#fff"},
        height=400,
        xaxis={
            "range": [0, 200],
            "showline": True,
            "zeroline": False,
            "fixedrange": True,
            "tickvals": [0, 50, 100, 150, 200],
            "ticktext": ["200", "150", "100", "50", "0"],
            "title": "Time Elapsed (sec)",
        },
        yaxis={
            "range": [
                min(0, min(df["ServoMeter"])),
                max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
            ],
            "showgrid": True,
            "showline": True,
            "fixedrange": True,
            "zeroline": False,
            "gridcolor": app_color["graph_line"],
            "nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
        },
    )

    return dict(data=[trace], layout=layout)

Capturing all the relevant columns & transform them into a graph, where the application will consume data into both the axis (x-axis & y-axis).

There are many other useful snippets, which creates separate useful widgets inside the dashboard.


Run:

Let us run the application –

Dashboard-View

So, we’ve done it.

You will get the complete codebase in the following Github link.

There is an excellent resource from the dash framework, which you should explore. The following link would be handy for developers who want to get some meaningful pre-built dashboard template, which you can customize as per your need through Python or R. Please find the link here.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[-1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'%5D
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Predicting real-time Covid-19 forecast by analyzing time-series data using Facebook machine-learning API

Hello Guys,

Today, I’ll share one of the important posts on predicting data using facebook’s relatively new machine-learning-based API. I find this API is interesting as to how it can build & anticipate the outcome.

We’ll be using one of the most acceptable API-based sources for Covid-19 & I’ll be sharing the link over here.

We’ll be using the prophet-API developed by Facebook to predict the data. You will get the details from this link.

Architecture

Now, let’s explore the architecture shared above.

As you can see that the application will consume the data from the third-party API named “about-corona,” & the python application will clean & transform the data. The application will send the clean data to the Facebook API (prophet) built on the machine-learning algorithm. This API is another effective time-series analysis platform given to the data scientist community.

Once the application receives the predicted model, it will visualize them using plotly & matplotlib.


I would request you to please check the demo of this application just for your reference.

Demo Run

We’ll do a time series analysis. Let us understand the basic concept of time series.

Time series is a series of data points indexed (or listed or graphed) in time order.

Therefore, the data organized by relatively deterministic timestamps and potentially compared with random sample data contain additional information that we can leverage for our business use case to make a better decision.

To use the prophet API, one needs to use & transform their data cleaner & should contain two fields (ds & y).

Let’s check one such use case since our source has plenty of good data points to decide. We’ve daily data of newly infected covid patients based on countries, as shown below –

Covid Cases

And, our clean class will transform the data into two fields –

Transformed Data

Once we fit the data into the prophet model, it will generate some additional columns, which will be used for prediction as shown below –

Generated data from prophet-api

And, a sample prediction based on a similar kind of data would be identical to this –

Sample Prediction

Let us understand what packages we need to install to prepare this application –

Installing Dependency Packages – I
Installing Dependency Packages – II

And, here is the packages –

pip install pandas
pip install matplotlib
pip install prophet

Let us now revisit the code –

1. clsConfig.py ( This native Python script contains the configuration entries. )


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### for Prophet API. Application will ####
#### process these information & perform ####
#### the call to our newly developed with ####
#### APIs developed by Facebook & a open-source ####
#### project called "About-Corona". ####
#####################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"coList": "DE, IN, US, CA, GB, ID, BR",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"MAX_RETRY": 3,
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

We’re not going to discuss anything specific to this script.

2. clsL.py ( This native Python script logs the application. )


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### ####
#### Objective: This script is a log ####
#### file, that is useful for debugging purpose. ####
#### ####
#####################################################
import pandas as p
import os
import platform as pl
class clsL(object):
def __init__(self):
self.path = os.path.dirname(os.path.realpath(__file__))
def logr(self, Filename, Ind, df, subdir=None, write_mode='w', with_index='N'):
try:
x = p.DataFrame()
x = df
sd = subdir
os_det = pl.system()
if sd == None:
if os_det == "windows":
fullFileName = self.path + '\\' + Filename
else:
fullFileName = self.path + '/' + Filename
else:
if os_det == "windows":
fullFileName = self.path + '\\' + sd + '\\' + Filename
else:
fullFileName = self.path + '/' + sd + '/' + Filename
if (with_index == 'N'):
if ((Ind == 'Y') & (write_mode == 'w')):
x.to_csv(fullFileName, index=False)
else:
x.to_csv(fullFileName, index=False, mode=write_mode, header=None)
else:
if ((Ind == 'Y') & (write_mode == 'w')):
x.to_csv(fullFileName)
else:
x.to_csv(fullFileName, mode=write_mode, header=None)
return 0
except Exception as e:
y = str(e)
print(y)
return 3

view raw

clsL.py

hosted with ❤ by GitHub

Based on the operating system, the log class will capture potential information under the “log” directory in the form of csv for later reference purposes.

3. clsForecast.py ( This native Python script will clean & transform the data. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling Data Cleaning API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from prophet import Prophet
class clsForecast:
def __init__(self):
self.fnc = cf.conf['FNC']
self.fnd = cf.conf['FND']
self.tms = cf.conf['TMS']
def forecastNewConfirmed(self, srcDF, debugInd, varVa):
try:
fnc = self.fnc
tms = self.tms
var = varVa
debug_ind = debugInd
countryISO = ''
df_M = p.DataFrame()
dfWork = srcDF
# Initiating Log class
l = cl.clsL()
#Extracting the unique country name
unqCountry = dfWork['CountryCode'].unique()
for i in unqCountry:
countryISO = i.strip()
print('Country Name: ' + countryISO)
df_Comm = dfWork[[tms, fnc]]
l.logr('13.df_Comm_' + var + '.csv', debug_ind, df_Comm, 'log')
# Aligning as per Prophet naming convention
df_Comm.columns = ['ds', 'y']
l.logr('14.df_Comm_Mod_' + var + '.csv', debug_ind, df_Comm, 'log')
return df_Comm
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df
def forecastNewDead(self, srcDF, debugInd, varVa):
try:
fnd = self.fnd
tms = self.tms
var = varVa
debug_ind = debugInd
countryISO = ''
df_M = p.DataFrame()
dfWork = srcDF
# Initiating Log class
l = cl.clsL()
#Extracting the unique country name
unqCountry = dfWork['CountryCode'].unique()
for i in unqCountry:
countryISO = i.strip()
print('Country Name: ' + countryISO)
df_Comm = dfWork[[tms, fnd]]
l.logr('17.df_Comm_' + var + '.csv', debug_ind, df_Comm, 'log')
# Aligning as per Prophet naming convention
df_Comm.columns = ['ds', 'y']
l.logr('18.df_Comm_Mod_' + var + '.csv', debug_ind, df_Comm, 'log')
return df_Comm
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df

view raw

clsForecast.py

hosted with ❤ by GitHub

Let’s explore the critical snippet out of this script –

df_Comm = dfWork[[tms, fnc]]

Now, the application will extract only the relevant columns to proceed.

df_Comm.columns = ['ds', 'y']

It is now assigning specific column names, which is a requirement for prophet API.

4. clsCovidAPI.py ( This native Python script will call the Covid-19 API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling Covid-19 API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
class clsCovidAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typVal = cf.conf['coList']
self.max_retries = cf.conf['MAX_RETRY']
def searchQry(self, varVa, debugInd):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typVal = self.typVal
max_retries = self.max_retries
var = varVa
debug_ind = debugInd
cnt = 0
df_M = p.DataFrame()
# Initiating Log class
l = cl.clsL()
payload = {}
strMsg = 'Input Countries: ' + str(typVal)
logging.info(strMsg)
headers = {}
countryList = typVal.split(',')
for i in countryList:
# Failed case retry
retries = 1
success = False
val = ''
try:
while not success:
# Getting response from web service
try:
df_conv = p.DataFrame()
strCountryUrl = url + str(i).strip()
print('Country: ' + str(i).strip())
print('Url: ' + str(strCountryUrl))
str1 = 'Url: ' + str(strCountryUrl)
logging.info(str1)
response = requests.request("GET", strCountryUrl, headers=headers, params=payload)
ResJson = response.text
#jdata = json.dumps(ResJson)
RJson = json.loads(ResJson)
df_conv = p.io.json.json_normalize(RJson)
df_conv.drop(['data.timeline'], axis=1, inplace=True)
df_conv['DummyKey'] = 1
df_conv.set_index('DummyKey')
l.logr('1.df_conv_' + var + '.csv', debug_ind, df_conv, 'log')
# Extracting timeline part separately
Rjson_1 = RJson['data']['timeline']
df_conv2 = p.io.json.json_normalize(Rjson_1)
df_conv2['DummyKey'] = 1
df_conv2.set_index('DummyKey')
l.logr('2.df_conv_timeline_' + var + '.csv', debug_ind, df_conv2, 'log')
# Doing Cross Join
df_fin = df_conv.merge(df_conv2, on='DummyKey', how='outer')
l.logr('3.df_fin_' + var + '.csv', debug_ind, df_fin, 'log')
# Merging with the previous Country Code data
if cnt == 0:
df_M = df_fin
else:
d_frames = [df_M, df_fin]
df_M = p.concat(d_frames)
cnt += 1
strCountryUrl = ''
if str(response.status_code)[:1] == '2':
success = True
else:
wait = retries * 2
print("retries Fail! Waiting " + str(wait) + " seconds and retrying!")
str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
logging.info(str_R1)
time.sleep(wait)
retries += 1
# Checking maximum retries
if retries == max_retries:
success = True
raise Exception
except Exception as e:
x = str(e)
print(x)
logging.info(x)
pass
except Exception as e:
pass
l.logr('4.df_M_' + var + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df

view raw

clsCovidAPI.py

hosted with ❤ by GitHub

Let us explore the key snippet –

countryList = typVal.split(',')

The application will fetch individual country names into a list based on the input lists from the configure script.

response = requests.request("GET", strCountryUrl, headers=headers, params=payload)
ResJson = response.text

RJson = json.loads(ResJson)

df_conv = p.io.json.json_normalize(RJson)
df_conv.drop(['data.timeline'], axis=1, inplace=True)
df_conv['DummyKey'] = 1
df_conv.set_index('DummyKey')

The application will extract the elements & normalize the JSON & convert that to a pandas dataframe & also added one dummy column, which will use for the later purpose to merge the data from another set.

# Extracting timeline part separately
Rjson_1 = RJson['data']['timeline']

df_conv2 = p.io.json.json_normalize(Rjson_1)
df_conv2['DummyKey'] = 1
df_conv2.set_index('DummyKey')

Now, the application will take the nested element & normalize that as per granular level. Also, it added the dummy column to join both of these data together.

# Doing Cross Join
df_fin = df_conv.merge(df_conv2, on='DummyKey', how='outer')

The application will Merge both the data sets to get the complete denormalized data for our use cases.

# Merging with the previous Country Code data
if cnt == 0:
    df_M = df_fin
else:
    d_frames = [df_M, df_fin]
    df_M = p.concat(d_frames)

This entire deserializing execution happens per country. Hence, the above snippet will create an individual sub-group based on the country & later does union to all the sets.

if str(response.status_code)[:1] == '2':
    success = True
else:
    wait = retries * 2
    print("retries Fail! Waiting " + str(wait) + " seconds and retrying!")
    str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
    logging.info(str_R1)
    time.sleep(wait)
    retries += 1

# Checking maximum retries
if retries == max_retries:
    success = True
    raise  Exception

If any calls to source API fails, the application will retrigger after waiting for a specific time until it reaches its maximum capacity.

5. callPredictCovidAnalysis.py ( This native Python script is the main one to predict the Covid. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'United Kingdom'
elif str(countryCD) == 'US':
cntCD = 'United States'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
#m.plot_components(df_M)
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
lbl = str(cntCD) + ' – Covid – ' + stat
m.plot(df_M, xlabel = 'Date', ylabel = lbl)
# Combine all graps in the same page
plt.title(f'Covid Forecasting')
plt.title(lbl)
plt.ylabel('Millions')
plt.show()
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
for i in countryList:
try:
cntryIndiv = i.strip()
print('Country Porcessing: ' + str(cntryIndiv))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
cntryFullName = countryDet(cntryIndiv)
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
if __name__ == "__main__":
main()

Let us explore the key snippet –

def countryDet(inputCD):
    try:
        countryCD = inputCD

        if str(countryCD) == 'DE':
            cntCD = 'Germany'
        elif str(countryCD) == 'BR':
            cntCD = 'Brazil'
        elif str(countryCD) == 'GB':
            cntCD = 'United Kingdom'
        elif str(countryCD) == 'US':
            cntCD = 'United States'
        elif str(countryCD) == 'IN':
            cntCD = 'India'
        elif str(countryCD) == 'CA':
            cntCD = 'Canada'
        elif str(countryCD) == 'ID':
            cntCD = 'Indonesia'
        else:
            cntCD = 'N/A'

        return cntCD
    except:
        cntCD = 'N/A'

        return cntCD

The application is extracting the full country name based on ISO country code.

# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]

# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')

The above script will convert all the column names in lower letters & then convert & cast them with the appropriate data type.

# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
m.fit(iDF)

forecastDF = m.make_future_dataframe(periods=365)

forecastDF = m.predict(forecastDF)

l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')

df_M = forecastDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')

The above snippet will use the machine-learning driven prophet-API, where the application will fit the model & then predict based on the existing data for a year. Also, we’ve identified the number of changepoints. By default, the prophet-API adds 25 changepoints to the initial 80% of the data set that trend is less flexible. 

Prophet allows you to adjust the trend in case there is an overfit or underfit. changepoint_prior_scale helps adjust the strength of the movement & decreasing the changepoint_prior_scale to 0.001 to make it less flexible.

def countrySpecificDF(counryDF, val):
    try:
        countryName = val
        df = counryDF

        df_lkpFile = df[(df['CountryCode'] == val)]

        return df_lkpFile
    except:
        df = p.DataFrame()

        return df

The application is fetching & creating the country-specific dataframe.

for i in countryList:
    try:
        cntryIndiv = i.strip()

        print('Country Porcessing: ' + str(cntryIndiv))

        # Creating dataframe for each country
        # Germany Main DataFrame
        dfCountry = countrySpecificDF(df, cntryIndiv)
        l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')

        # Let's pass this to our map section
        retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)

        statVal = str(NC)

        a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)

        retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)

        statVal = str(ND)

        a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)

        cntryFullName = countryDet(cntryIndiv)

        if (a1 + a2) == 0:
            oprMsg = cntryFullName + ' ' + SM
            print(oprMsg)
        else:
            oprMsg = cntryFullName + ' ' + FM
            print(oprMsg)

        # Resetting the dataframe value for the next iteration
        dfCountry = p.DataFrame()
        cntryIndiv = ''
        oprMsg = ''
        cntryFullName = ''
        a1 = 0
        a2 = 0
        statVal = ''

        cnt += 1
    except Exception as e:
        x = str(e)
        print(x)

The above snippet will call the function to predict the data & then predict the visual representation based on plotting the data points.


Let us run the application –

Application Run

And, it will generate the visual representation as follows –

Application Run – Continue

And, here is the folder structure –

Directory Structure

Let’s explore the comparison study & try to find out the outcome –

Option – 1
Option – 2
Option – 3
Option -4

Let us analyze from the above visual data-point.


Conclusion:

Let’s explore the comparison study & try to find out the outcome –

  1. India may see a rise of new covid cases & it might cross the mark 400,000 during June 2022 & would be the highest among the countries that we’ve considered here including India, Indonesia, Germany, US, UK, Canada & Brazil. The second worst affected country might be the US during the same period. The third affected country will be Indonesia during the same period.
  2. Canada will be the least affected country during June 2022. The figure should be within 12,000.
  3. However, death case wise India is not only the leading country. The US, India & Brazil will see almost 4000 or slightly over the 4000 marks.

So, we’ve done it.


You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row – self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = -1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = -1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
def create_dashboard(update_freq=0.0):
page = site['/dashboard_finnhub']
general_log_path = str(cf.config['LOG_PATH'])
ably_id = str(cf.config['ABLY_ID'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO)
os_det = pl.system()
if os_det == "Windows":
src_path = path + '\\' + 'data\\'
else:
src_path = path + '/' + 'data/'
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[-1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
# Writing to the file
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir)
# Dropping unwanted columns
df_conv.drop(columns=['c'], axis=1, inplace=True)
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
# New Column List Orders
column_order = ['s', 'default_rank', 'p', 't', 'v']
df_unique_fin = df_unique.reindex(column_order, axis=1)
print('Rank DF Unique:')
print(df_unique_fin)
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir)
# Capturing transformed values into a DataFrame
# Depending on your logic, you'll implement that inside
# the process_DF functions
fin_df = process_DF(df_conv, df_unique_fin)
df_unq_fin = df_unique_fin.copy()
df_unq_fin.rename(columns={'s':'Company'}, inplace=True)
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True)
df_unq_fin.rename(columns={'v':'Change'}, inplace=True)
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True)
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir)
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True)
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir)
# Final clearance for better understanding of data
fin_df.drop(columns=['t'], axis=1, inplace=True)
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir)
count_row = df_unq_finale.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row – start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', -15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[-1] = [cat, val]
page.save()
if __name__ == "__main__":
try:
# Main Calling script
create_dashboard(update_freq=0.25)
except Exception as e:
x = str(e)
print(x)

Let’s explore the key snippets from the above script –

def process_DF(inputDF, inputDFUnq):
    try:
        # Core Business logic
        # The application will show default value to any
        # trade-in stock in case that data doesn't consume
        # from the source.
        
        # Getting block count
        #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
        #l.logr('3. block_df.csv', 'Y', df_conv, subdir)

        # Getting block count
        #df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
        #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
        #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
        #df_conv_fin = df_conv.value_counts(['default_rank'])
        df_conv = inputDF
        df_unique_fin = inputDFUnq

        df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
        l.logr('3. max_df.csv', 'Y', df_conv, subdir)


        # Sorting the output
        sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)

        # New Column List Orders
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        df_fin = sorted_df.reindex(column_order, axis=1)

        l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)

        # Now splitting the sorted df into two sets
        lkp_max_count = 4
        df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]

        l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)

        df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
        l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)

        # Now to perform cross join, we will create
        # a key column in both the DataFrames to
        # merge on that key.
        df_unique_fin['key'] = 1
        df_fin_req['key'] = 1

        # Dropping unwanted columns
        df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
        l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)

        # Padding with dummy key values
        #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
        merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)

        l.logr('8. merge_df.csv', 'Y', merge_df, subdir)

        # Sorting the output
        sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)

        l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)

        # Calling new derived logic
        sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
        sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)

        l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Dropping unwanted columns
        sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)

        #Renaming the columns
        sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
        sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)

        l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Aligning columns
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)

        l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)

        # Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
        frames = [df_fin_na, merge_fin_df]
        fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])

        l.logr('13. fin_df.csv', 'Y', fin_df, subdir)

        # Final clearance & organization
        fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)

        l.logr('14. Final.csv', 'Y', fin_df, subdir)

        # Adjusting key columns
        fin_df.rename(columns={'s':'Company'}, inplace=True)
        fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
        fin_df.rename(columns={'v':'Change'}, inplace=True)

        l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)

        return fin_df
    except Exception as e:
        print('$' * 120)

        x = str(e)
        print(x)

        print('$' * 120)

        df = p.DataFrame()

        return df

The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –

a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1

Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.

def calc_p(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_p_val = float(row['p_y'])
        else:
            calc_p_val = float(row['p_x'])

        return calc_p_val
    except:
        return 0.0

def calc_v(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_v_val = float(row['v_y'])
        else:
            calc_v_val = float(row['v_x'])

        return calc_v_val
    except:
        return 0.0

The above snippet will capture the default values for those missing records.

    client = AblyRest(ably_id)
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

In the above snippet, the application will consume the streaming data from the Ably queue.

for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.

    # Converting dataframe to a desired Series
    f = CategoricalSeries(fin_df)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unq_finale.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Company'])

        ####################################################
        ##### Debug Purpose                        #########
        ####################################################
        print('Company: ', x_currency)
        print('J: ', str(j))
        print('Cat: ', cat)
        ####################################################
        #####   End Of Debug                         #######
        ####################################################

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            print('Update Cat: ', cat)
            print('Update Val: ', val)
            print('Update pc: ', pc)
            print('*' * 160)

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.


2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
# generate random floating point values
from random import seed
from random import random
# seed random number generator
import websocket
import json
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.config['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
def on_message(ws, message):
print("*" * 60)
res = json.loads(message)
jsBody = res["data"]
jdata_dyn = json.dumps(jsBody)
print(jdata_dyn)
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{
"c": "null",
"p": 0.01,
"s": "AAPL",
"t": 1624715406407,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "AMZN",
"t": 1624715406408,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "BINANCE:BTCUSDT",
"t": 1624715406409,
"v": 0.01
},
{
"c": "null",
"p": 0.01,
"s": "IC MARKETS:1",
"t": 1624715406410,
"v": 0.01
}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
jsBody = []
jdata_dyn = ''
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
# Invoking Individual Company Trade Queries
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()

The key snippet from the above script –

    json_data = [{
        "c": "null",
        "p": 0.01,
        "s": "AAPL",
        "t": 1624715406407,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "AMZN",
        "t": 1624715406408,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "BINANCE:BTCUSDT",
        "t": 1624715406409,
        "v": 0.01
    },
        {
        "c": "null",
        "p": 0.01,
        "s": "IC MARKETS:1",
        "t": 1624715406410,
        "v": 0.01
        }]

As we already discussed, we’ll pass a default set of data for all the candidate companies.

    # Publish a message to the sd_channel channel
    channel.publish('event', jdata)

    # Publish rest of the messages to the sd_channel channel
    channel.publish('event', jdata_dyn)

Publish the messages to the created channel.

def on_open(ws):
    # Invoking Individual Company Trade Queries
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

Send the company-specific trade queries through websocket apps to submit that to FinnHub.

3. clsConfig.py ( This file contains the configuration details. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning. Application will ####
#### process these information & perform ####
#### various analysis on Linear-Regression. ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'Data' + sep,
'APP_DESC_1': 'H2O Wave Integration with FinHubb!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e'
}

view raw

clsConfig.py

hosted with ❤ by GitHub


Let’s explore the directory structure –

MAC Directory

Let’s run the application –

Step 1:

Starting of Wave Server

Step 2:

Triggering message consumption job

Step 3:

Triggering the main application

You can monitor the message consumption from your Ably portal as follows –

Message Consumption

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

Message spike during consumption

And, the final output in the interactive dashboard will be look like the below screenshot –

Interactive Real-time Dashboard

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Creating a real-time dashboard from streaming data using Python

Hi Guys,

Today, I’ll demonstrate one of the fascinating ways to capture real-time streaming data in a dashboard. It is a dream for any developer who wants to build an application involving streaming data, API & a dashboard.

Why don’t we see our run to make this thread more interesting?

Real-Time Dashboard using streaming data

Today, I’ll be using the two most essential services to achieve that goal.

Ably

H2O-Wave

Let’s discuss brief about these two services.

  • Why I used “Ably” here?

One of my scenarios is to consume real-time currency data. Even after checking paid-API, I was not getting what I was looking for. Hence, I decided to use any service, which can mimics & publish my data as streaming data through a channel. Once published, I’ll consume the posted data into my application to create this new dashboard.

Using Ably, you can leverage their cloud platform to publish & consume data with the free developer account, which is sufficient for anyone.

To better understand this, we need to understand the basic concept of “pubsub”. Here is the important page from their side that I would like to embed for your reference –

Source: Ably

To know more about this, please refer to the following link.

  • Why I used “H2O-Wave” here?

Wave_H2O is a relatively brand new framework with some outstanding capabilities to visualize your data using native Python.

  • Pre-Steps:

We need to register Ably. Some of the useful screen that we should explore more –

API-Key Page

Successful creation of an App will generate the API-Key. Make sure that you note-down the channel details as well.

Quota Limit

The above page will capture the details of usage. Since this is a free subscription, you will be blocked once you consume your limit. However, for paid users, this is one of the vital pages to control their budget.

Message Published & Consumption Visuals

Like any other cloud service, you can check your message published or consumptions here on this page.

This is the main landing page for H2O-Wave –

H2O Wave

They have a quite many example snippet. However, these samples contain random data. Hence, these are relatively easier to implement. It would take quite some effort to tailor it for your need to implement that for real-life scenarios.

Some of the important links are as follows –

  1. H2O-Wave Tour
  2. GitHub

You need to install the following libraries in Python –

pip install ably
pip install h2o-wave

We’ve two scripts. We’re not going to discuss the publish streaming data script over here. We’ll be discussing only the consumption script, which will generate the dashboard as well. If you need, you can post your message. I’ll provide it.

1. dashboard_st.py ( This native Python script will consume streaming data & create live dashboard. )

##########################################################
#### Template Written By: H2O Wave                    ####
#### Enhanced with Streaming Data By: Satyaki De      ####
#### Base Version Enhancement On: 20-Dec-2020         ####
#### Modified On 26-Dec-2020                          ####
####                                                  ####
#### Objective: This script will consume real-time    ####
#### streaming data coming out from a hosted API      ####
#### sources using another popular third-party        ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for     ####
#### any start-ups.                                   ####
##########################################################

import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json

class DaSeries:
    def __init__(self, inputDf):
        self.Df = inputDf
        self.count_row = inputDf.shape[0]
        self.start_pos = 0
        self.end_pos = 0
        self.interval = 1


    def next(self):
        try:
            # Getting Individual Element & convert them to Series
            if ((self.start_pos + self.interval) <= self.count_row):
                self.end_pos = self.start_pos + self.interval
            else:
                self.end_pos = self.start_pos + (self.count_row - self.start_pos)

            split_df = self.Df.iloc[self.start_pos:self.end_pos]

            if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
                pass
            else:
                self.start_pos = self.start_pos + self.interval

            x = float(split_df.iloc[0]['CurrentExchange'])
            dx = float(split_df.iloc[0]['Change'])

            # Emptying the exisitng dataframe
            split_df = p.DataFrame(None)

            return x, dx
        except:
            x = 0
            dx = 0

            return x, dx

class CategoricalSeries:
    def __init__(self, sourceDf):
        self.series = DaSeries(sourceDf)
        self.i = 0

    def next(self):
        x, dx = self.series.next()
        self.i += 1
        return f'C{self.i}', x, dx


light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()

_color_index = -1
colors = dark_theme_colors

def next_color():
    global _color_index
    _color_index += 1
    return colors[_color_index % len(colors)]


_curve_index = -1
curves = 'linear smooth step stepAfter stepBefore'.split()


def next_curve():
    global _curve_index
    _curve_index += 1
    return curves[_curve_index % len(curves)]


def create_dashboard(update_freq=0.0):
    page = site['/dashboard_st']

    # Fetching the data
    client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

    # Counter Value
    cnt = 0

    # Declaring Global Data-Frame
    df_conv = p.DataFrame()

    for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

    # Resetting the Index Value
    df_conv.reset_index(drop=True, inplace=True)

    print('DF:')
    print(df_conv)

    df_conv['default_rank'] = df_conv.groupby(['Currency']).cumcount() + 1
    lkp_rank = 1
    df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]

    print('Rank DF Unique:')
    print(df_unique)

    count_row = df_unique.shape[0]

    large_lines = []
    start_pos = 0
    end_pos = 0
    interval = 1

    # Converting dataframe to a desired Series
    f = CategoricalSeries(df_conv)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unique.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Currency'])

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

create_dashboard(update_freq=0.25)

Some of the key snippets from the above codes are –

class DaSeries:
    def __init__(self, inputDf):
        self.Df = inputDf
        self.count_row = inputDf.shape[0]
        self.start_pos = 0
        self.end_pos = 0
        self.interval = 1


    def next(self):
        try:
            # Getting Individual Element & convert them to Series
            if ((self.start_pos + self.interval) <= self.count_row):
                self.end_pos = self.start_pos + self.interval
            else:
                self.end_pos = self.start_pos + (self.count_row - self.start_pos)

            split_df = self.Df.iloc[self.start_pos:self.end_pos]

            if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
                pass
            else:
                self.start_pos = self.start_pos + self.interval

            x = float(split_df.iloc[0]['CurrentExchange'])
            dx = float(split_df.iloc[0]['Change'])

            # Emptying the exisitng dataframe
            split_df = p.DataFrame(None)

            return x, dx
        except:
            x = 0
            dx = 0

            return x, dx

class CategoricalSeries:
    def __init__(self, sourceDf):
        self.series = DaSeries(sourceDf)
        self.i = 0

    def next(self):
        x, dx = self.series.next()
        self.i += 1
        return f'C{self.i}', x, dx

The above snippet will create a series of data out of a pandas data frame. It will consume, one-by-one record & then pass it to the dashboard for real-time updates.

# Fetching the data
client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
channel = client.channels.get('sd_channel')

message_page = channel.history()

In the above code, the application will consume the real-time data out of Ably’s channel.

df_conv['default_rank'] = df_conv.groupby(['Currency']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]

In the above code, the application is uniquely identifying the first instance of currency entries, which will be passed to the initial dashboard page before consuming the array of updates.

f = CategoricalSeries(df_conv)

In the above code, the application is creating an instance of the intended categorical series.

c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

The above code is a standard way to bind the streaming data with the H2O-Wave dashboard.

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

Here are the last few snippet lines that will capture the continuous streaming data & keep updating the numbers on your dashboard.

Since I’ve already provided the run video of my application, here are a few important screens –

Case 1:

Wave Server Start Command

Case 2:

Publishing stream data

Case 3:

Consuming Stream Data & Publishing to Dashboard

Case 4:

Dashboard Data

So, finally, we have done it.

You will get the complete codebase in the following Github link.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.