Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/",
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css&#39;]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Creating a dynamic response of an API/Microservice

Hello Guys!

Today, I’m going to discuss a potential use case, where on many occasions, different teams need almost similar kinds of data through API. However, they are not identical. Creating a fresh API/Microservice after following-up with many processes will take significant time.

What if we can create an API in such a way so that we can get the response dynamically without needing to make another one. In this post, we’ll be demonstrating a similar approach.

I’ll be using open-source Covid-API, which will be useful for several posts starting from this one.

You will get plenty of useful data from here.

We’ve chosen the following one for our use case –

API-Reference

Let’s explore the sample data first.

[
   {
      "date":20210207,
      "state":"AK",
      "positive":53279.0,
      "probableCases":null,
      "negative":null,
      "pending":null,
      "totalTestResultsSource":"totalTestsViral",
      "totalTestResults":1536911.0,
      "hospitalizedCurrently":44.0,
      "hospitalizedCumulative":1219.0,
      "inIcuCurrently":null,
      "inIcuCumulative":null,
      "onVentilatorCurrently":11.0,
      "onVentilatorCumulative":null,
      "recovered":null,
      "dataQualityGrade":"A",
      "lastUpdateEt":"2\/5\/2021 03:59",
      "dateModified":"2021-02-05T03:59:00Z",
      "checkTimeEt":"02\/04 22:59",
      "death":279.0,
      "hospitalized":1219.0,
      "dateChecked":"2021-02-05T03:59:00Z",
      "totalTestsViral":1536911.0,
      "positiveTestsViral":64404.0,
      "negativeTestsViral":1470760.0,
      "positiveCasesViral":null,
      "deathConfirmed":null,
      "deathProbable":null,
      "totalTestEncountersViral":null,
      "totalTestsPeopleViral":null,
      "totalTestsAntibody":null,
      "positiveTestsAntibody":null,
      "negativeTestsAntibody":null,
      "totalTestsPeopleAntibody":null,
      "positiveTestsPeopleAntibody":null,
      "negativeTestsPeopleAntibody":null,
      "totalTestsPeopleAntigen":null,
      "positiveTestsPeopleAntigen":null,
      "totalTestsAntigen":null,
      "positiveTestsAntigen":null,
      "fips":"02",
      "positiveIncrease":0,
      "negativeIncrease":0,
      "total":53279,
      "totalTestResultsIncrease":0,
      "posNeg":53279,
      "deathIncrease":0,
      "hospitalizedIncrease":0,
      "hash":"07a5d43f958541e9cdabb5ea34c8fb481835e130",
      "commercialScore":0,
      "negativeRegularScore":0,
      "negativeScore":0,
      "positiveScore":0,
      "score":0,
      "grade":""
   }
]

Let’s take two cases. One, where one service might need to access all the elements, there might be another, where some other service requires specific details.

Let’s explore the code base first –

  1. init.py ( This native Python-based azure-function that will consume streaming data & dynamic API response. )
###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 06-Feb-2021           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: Main Calling scripts.  ####
####                                   ####
#### However, to meet the functionality####
#### we've enhanced as per our logic.  ####
###########################################

import logging
import json
import requests
import os
import pandas as p
import numpy as np

import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Dynamic-Covid-Status HTTP trigger function processed a request.')

    try:

        # Application Variable
        url = os.environ['URL']
        appType = os.environ['appType']
        conType = os.environ['conType']

        # API-Configuration
        payload={}
        headers = {
            "Connection": conType,
            "Content-Type": appType
        }

        # Validating input parameters
        typeSel = req.params.get('typeSel')
        if not typeSel:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeSel = req_body.get('typeSel')
        
        typeVal = req.params.get('typeVal')
        if not typeVal:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeVal = req_body.get('typeVal')

        # Printing Key-Element Values
        str1 = 'typeSel: ' + str(typeSel)
        logging.info(str1)

        str2 = 'typeVal: ' + str(typeVal)
        logging.info(str2)

        # End of API-Inputs

        # Getting Covid data from the REST-API
        response = requests.request("GET", url, headers=headers, data=payload)
        ResJson  = response.text

        if typeSel == '*':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                rJson = df_ret.to_json(orient ='records') 

                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for all values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        elif typeSel == 'Cols':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                # Fetching for the selected columns
                # Extracting the columns from the list
                lstHead = []

                listX = typeVal.split (",")

                for i in listX:
                    lstHead.append(str(i).strip())

                str3 = 'Main List: ' + str(lstHead)
                logging.info(str3)

                slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
                rJson = slice_df.to_json(orient ='records') 
                
                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for selected values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        else:
            x_stat = 'Failed'
            x_msg = 'Important information is missing for typeSel!'

            rJson = {
                "status": x_stat,
                "details": x_msg
            }

            xval = json.dumps(rJson)
            return func.HttpResponse(xval, status_code=200)
    except Exception as e:
        x_msg = str(e)
        x_stat = 'Failed'

        rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

        xval = json.dumps(rJson)
        return func.HttpResponse(xval, status_code=200)

And, Inside the azure portal it looks like –

Dynamic Function inside the Azure portal

Let’s explain the key snippet –

jdata = json.loads(ResJson)

df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

rJson = df_ret.to_json(orient ='records') 

return func.HttpResponse(rJson, status_code=200)

In the above lines, we’re converting the response & organizing it to a pandas dataframe before converting the response to JSON.

# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []

listX = typeVal.split (",")

for i in listX:
    lstHead.append(str(i).strip())

str3 = 'Main List: ' + str(lstHead)
logging.info(str3)

#slice_df = df_ret[df_ret.columns.intersection(lstHead)]
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]

For the second case, the above additional logic will play a significant part. Based on the supplied input in the typeVal attribute, this time, the new response will display accordingly.

Let’s see how it looks –

Azure function in Visual Studio Code
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">Let's test it using Postman -Let’s test it using Postman –

Case 1 (For all the columns):

For all elements

And, the formatted output is as follows –

Formatted output for all elements

Case 2 (For selected columns):

For selected elements
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">And, the formatted output is as follows -And, the formatted output is as follows –
Formatted output of Selected element case

You can find the code in the Github using the following link.


So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Predicting health issues for Senior Citizens based on “Realtime Weather Data” in Python

Hi Guys,

Today, I’ll be presenting a different kind of post here. I’ll be trying to predict health issues for senior citizens based on “realtime weather data” by blending open-source population data using some mock risk factor calculation. At the end of the post, I’ll be plotting these numbers into some graphs for better understanding.

Let’s drive!

For this first, we need realtime weather data. To do that, we need to subscribe to the data from OpenWeather API. For that, you have to register as a developer & you’ll receive a similar email from them once they have approved –

1. Subscription To Open Weather

So, from the above picture, you can see that, you’ll be provided one API key & also offered a couple of useful API documentation. I would recommend exploring all the links before you try to use it.

You can also view your API key once you logged into their console. You can also create multiple API keys & the screen should look something like this –

2. Viewing Keys For security reasons, I’ll be hiding my own keys & the same should be applicable for you as well.

I would say many of these free APIs might have some issues. So, I would recommend you to start testing the open API through postman before you jump into the Python development. Here is the glimpse of my test through the postman –

3. Testing API

Once, I can see that the API is returning the result. I can work on it.

Apart from that, one needs to understand that these API might have limited use & also you need to know the consequences in terms of price & tier in case if you exceeded the limit. Here is the detail for this API –

5. Package Details - API

For our demo, I’ll be using the Free tire only.

Let’s look into our other source data. We got the top 10 city population-wise over there internet. Also, we have collected sample Senior Citizen percentage against sex ratio across those cities. We have masked these values on top of that as this is just for education purposes.

1. CityDetails.csv

Here is the glimpse of this file –

4. Source File

So, this file only contains the total population across the top 10 cities in the USA.

2. SeniorCitizen.csv

6. SeniorCitizen Data

This file contains the Sex ratio of Senior citizens across those top 10 cities by population.

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "http://api.openweathermap.org/data/2.5/weather",
        'API_HOST': "api.openweathermap.org",
        'API_KEY': "XXXXXXXXXXXXXXXXXXXXXX",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Open Weather Forecast',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SRC_FILE': Curr_Path + sep + 'Src_File' + sep + 'CityDetails.csv',
        'SRC_FILE_1': Curr_Path + sep + 'Src_File' + sep + 'SeniorCitizen.csv',
        'SRC_FILE_INIT': 'CityDetails.csv',
        'COL_LIST': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'name', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'weather', 'deg', 'gust', 'speed'],
        'COL_LIST_1': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'CityName', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription'],
        'COL_LIST_2': ['CityName', 'Population', 'State']
    }

2. clsWeather.py (This script contains the main logic to extract the realtime data from our subscribed weather API.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsWeather:
    def __init__(self):
        self.url = cf.config['URL']
        self.openmapapi_host = cf.config['API_HOST']
        self.openmapapi_key = cf.config['API_KEY']
        self.openmapapi_cache = cf.config['CACHE']
        self.openmapapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            openmapapi_host = self.openmapapi_host
            openmapapi_key = self.openmapapi_key
            openmapapi_cache = self.openmapapi_cache
            openmapapi_con = self.openmapapi_con
            type = self.type

            querystring = {"appid": openmapapi_key, "q": rawQry}

            print('Input JSON: ', str(querystring))

            headers = {
                'host': openmapapi_host,
                'content-type': type,
                'Cache-Control': openmapapi_cache,
                'Connection': openmapapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

The key lines from this script –

querystring = {"appid": openmapapi_key, "q": rawQry}

print('Input JSON: ', str(querystring))

headers = {
    'host': openmapapi_host,
    'content-type': type,
    'Cache-Control': openmapapi_cache,
    'Connection': openmapapi_con
}

response = requests.request("GET", url, headers=headers, params=querystring)

ResJson  = response.text

In the above snippet, our application first preparing the payload & the parameters received from our param script. And then invoke the GET method to extract the real-time data in the form of JSON & finally sending the JSON payload to the primary calling function.

3. clsMap.py (This script contains the main logic to prepare the MAP using seaborn package & try to plot our custom made risk factor by blending the realtime data with our statistical data received over the internet.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### plot into the Map.                   ####
##############################################

import seaborn as sns
import logging
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl

# This library requires later
# to print the chart
import matplotlib.pyplot as plt

class clsMap:
    def __init__(self):
        self.src_file =  cf.config['SRC_FILE_1']

    def calculateRisk(self, row):
        try:
            # Let's assume some logic
            # 1. By default, 30% of Senior Citizen
            # prone to health Issue for each City
            # 2. Male Senior Citizen is 19% more prone
            # to illness than female.
            # 3. If humidity more than 70% or less
            # than 40% are 22% main cause of illness
            # 4. If feels like more than 280 or
            # less than 260 degree are 17% more prone
            # to illness.
            # Finally, this will be calculated per 1K
            # people around 10 blocks

            str_sex = str(row['Sex'])

            int_humidity = int(row['humidity'])
            int_feelsLike = int(row['feels_like'])
            int_population = int(str(row['Population']).replace(',',''))
            float_srcitizen = float(row['SeniorCitizen'])

            confidance_score = 0.0

            SeniorCitizenPopulation = (int_population * float_srcitizen)

            if str_sex == 'Male':
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.19) + confidance_score
            else:
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.11) + confidance_score

            if ((int_humidity > 70) | (int_humidity < 40)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.22

            if ((int_feelsLike > 280) | (int_feelsLike < 260)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.17

            final_score = round(round(confidance_score, 2) / (1000 * 10), 2)

            return final_score

        except Exception as e:
            x = str(e)

            return x

    def setMap(self, dfInput):
        try:
            resVal = 0
            df = p.DataFrame()
            debug_ind = 'Y'
            src_file =  self.src_file

            # Initiating Log Class
            l = cl.clsL()

            df = dfInput

            # Creating a subset of desired columns
            dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

            l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

            # Fetching Senior Citizen Data
            df = p.read_csv(src_file, index_col=False)

            # Merging two frames
            dfMerge = p.merge(df, dfMod, on=['CityName'])

            l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

            # Getting RiskFactor quotient from our custom made logic
            dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

            l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

            # Generating Map plotss
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})
            sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

            # This is required when you are running
            # through normal Python & not through
            # Jupyter Notebook
            plt.show()

            return resVal

        except Exception as e:
            x = str(e)
            print(x)

            logging.info(x)
            resVal = x

            return resVal

Key lines from the above codebase –

# Creating a subset of desired columns
dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

# Fetching Senior Citizen Data
df = p.read_csv(src_file, index_col=False)

# Merging two frames
dfMerge = p.merge(df, dfMod, on=['CityName'])

l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

# Getting RiskFactor quotient from our custom made logic
dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

Combining our Senior Citizen data with already processed data coming from our primary calling script. Also, here the application is calculating our custom logic to find out the risk factor figures. If you want to go through that, I’ve provided the logic to derive it. However, this is just a demo to find out similar figures. You should not rely on the logic that I’ve used (It is kind of my observation of life till now. :D).

The below lines are only required when you are running seaborn, not via Jupyter notebook.

plt.show()

4. callOpenMapWeatherAPI.py (This is the first calling script. This script also calls the realtime API & then blend the first file with it & pass the only relevant columns of data to our Map script to produce the graph.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsWeather as ct
import re
import numpy as np
import clsMap as cm

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getMainWeather(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_main_weather = str(df_lkp.iloc[0]['main'])

        return str_main_weather

    except Exception as e:
        x = str(e)
        str_main_weather = x

        return str_main_weather

def getMainDescription(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_description = str(df_lkp.iloc[0]['description'])

        return str_description

    except Exception as e:
        x = str(e)
        str_description = x

        return str_description

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']
        col_list = cf.config['COL_LIST']
        col_list_1 = cf.config['COL_LIST_1']
        col_list_2 = cf.config['COL_LIST_2']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        df2 = p.DataFrame()

        src_file =  cf.config['SRC_FILE']

        # Fetching data from source file
        df = p.read_csv(src_file, index_col=False)

        # Creating a list of City Name from the source file
        city_list = df['CityName'].tolist()

        # Declaring an empty dictionary
        merge_dict = {}
        merge_dict['city'] = df2

        start_pos = 1
        src_file_name = '1.' + cf.config['SRC_FILE_INIT']

        for i in city_list:
            x1 = ct.clsWeather()
            ret_2 = x1.searchQry(i)

            # Capturing the JSON Payload
            res = json.loads(ret_2)

            # Converting dictionary to Pandas Dataframe
            # df_ret = p.read_json(ret_2, orient='records')

            df_ret = p.io.json.json_normalize(res)
            df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

            # Removing any duplicate columns
            df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

            # l.logr(str(start_pos) + '.1.' + src_file_name, debug_ind, df_ret, 'log')
            start_pos = start_pos + 1

            # If all the conversion successful
            # you won't get any gust column
            # from OpenMap response. Hence, we
            # need to add dummy reason column
            # to maintain the consistent structures

            if 'gust' not in df_ret.columns:
                df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

            # Resetting the column orders as per JSON
            column_order = col_list
            df_mod_ret = df_ret.reindex(column_order, axis=1)

            if start_pos == 1:
                merge_dict['city'] = df_mod_ret
            else:
                d_frames = [merge_dict['city'], df_mod_ret]
                merge_dict['city'] = p.concat(d_frames)

            start_pos += 1

        for k, v in merge_dict.items():
            l.logr(src_file_name, debug_ind, merge_dict[k], 'log')

        # Now opening the temporary file
        temp_log_file = log_dir + src_file_name

        dfNew = p.read_csv(temp_log_file, index_col=False)

        # Extracting Complex columns
        dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
        dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

        l.logr('2.dfNew.csv', debug_ind, dfNew, 'log')

        # Removing unwanted columns & Renaming key columns
        dfNew.drop(['weather'], axis=1, inplace=True)
        dfNew.rename(columns={'name': 'CityName'}, inplace=True)

        l.logr('3.dfNewMod.csv', debug_ind, dfNew, 'log')

        # Now joining with the main csv
        # to get the complete picture
        dfMain = p.merge(df, dfNew, on=['CityName'])

        l.logr('4.dfMain.csv', debug_ind, dfMain, 'log')

        # Let's extract only relevant columns
        dfSuppliment = dfMain[['CityName', 'Population', 'State', 'country', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription']]

        l.logr('5.dfSuppliment.csv', debug_ind, dfSuppliment, 'log')

        # Let's pass this to our map section
        x2 = cm.clsMap()
        ret_3 = x2.setMap(dfSuppliment)

        if ret_3 == 0:
            print('Successful Map Generated!')
        else:
            print('Please check the log for further issue!')

        print("-" * 60)
        print()

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

Once the application received the JSON response from the realtime API, the application is converting it to pandas dataframe.

# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

Since this is a complex JSON response. The application might encounter duplicate columns, which might cause a problem later. Hence, our app is removing all these duplicate columns as they are not required for our cases.

if 'gust' not in df_ret.columns:
    df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

There is a possibility that the application might not receive all the desired attributes from the realtime API. Hence, the above lines will check & add a dummy column named gust for those records in case if they are not present in the JSON response.

if start_pos == 1:
    merge_dict['city'] = df_mod_ret
else:
    d_frames = [merge_dict['city'], df_mod_ret]
    merge_dict['city'] = p.concat(d_frames)

These few lines required as our API has a limitation of responding with only one city at a time. Hence, in this case, we’re retrieving one town at a time & finally merge them into a single dataframe before creating a temporary source file for the next step.

At this moment our data should look like this –

16. Intermediate_Data_1

Let’s check the weather column. We need to extract the main & description for our dashboard, which will be coming in the next installment.

# Extracting Complex columns
dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

Hence, we’ve used the following two functions to extract these values & the critical snippet from one of the service is as follows –

lkp_Columns = str(row['weather'])
jpayload = str(lkp_Columns).replace("'", '"')
payload = json.loads(jpayload)

df_lkp = p.io.json.json_normalize(payload)
df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

str_main_weather = str(df_lkp.iloc[0]['main'])

The above lines extracting the weather column & replacing the single quotes with the double quotes before the application is trying to convert that to JSON. Once it converted to JSON, the json_normalize will easily serialize it & create individual columns out of it. Once you have them captured inside the pandas dataframe, you can extract the unique values & store them & return them to your primary calling function.

# Let's pass this to our map section
x2 = cm.clsMap()
ret_3 = x2.setMap(dfSuppliment)

if ret_3 == 0:
    print('Successful Map Generated!')
else:
    print('Please check the log for further issue!')

In the above lines, the application will invoke the Map class to calculate the remaining logic & then plotting the data into the seaborn graph.

Let’s just briefly see the central directory structure –

10. RunWindow

Here is the log directory –

11. Log Directory

And, finally, the source directory should look something like this –

12. SourceDir

Now, let’s runt the application –

Following lines are essential –

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')

This will project the plot like this –

13. AdditionalOption

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})

This will lead to the following figures –

14. Adding Markers

As you can see, here, using the marker of (‘o’/’v’) leads to two different symbols for the different gender.

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

This will lead to –

15. Separate By Sex

So, in this case, the application has created two completely different sets for Sex.

So, finally, we’ve done it. 😀

In the next post, I’ll be doing some more improvisation on top of these data sets. Till then – Happy Avenging! 🙂

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Magic SQL

Few years before in OTN one of the user is looking for a solutions, which we think might not possible to provide in a single SQL solution. At that time Michael came to rescue that and for the first time he showed some interesting XML Kung-Fu to all of us and earned a great reputation for providing magic solution to others. I personally love to call them as Magic SQL.

The following SQL calculates number of rows in each table in a specific schema without visiting any dba views. This particular script is my 2nd personal favourite.

scott@ORCL>
scott@ORCL>select * from v$version;

BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
PL/SQL Release 11.1.0.6.0 - Production
CORE 11.1.0.6.0 Production
TNS for 32-bit Windows: Version 11.1.0.6.0 - Production
NLSRTL Version 11.1.0.6.0 - Production

Elapsed: 00:00:00.01
scott@ORCL>
scott@ORCL>
scott@ORCL>select table_name,
2 DBMS_XMLGEN.getxmltype(
3 'select count(*) c from '||table_name
4 ).extract('//text()').getnumberval() tot_rows
5 from user_tables
6 where iot_type is null
7 or iot_type != 'IOT_OVERFLOW';

TABLE_NAME TOT_ROWS
------------------------------ ----------
DEPT 4
EMP 14
BONUS 0
SALGRADE 5
EMP_DETAILS 3
T_EMP 0
AUDIT_T 0
C_C 4
TRAIN_DET 2
TEST_COM_IMP 2
TIME_P 1

TABLE_NAME TOT_ROWS
------------------------------ ----------
PRI_UQ 4
TEST_CHK 0
ANSHUMANSAHAISGENIOUS 1
XEUROPE 2
D_D 8
PUBLICTRANSPORT 4
XASIA 2
TF1 0
TEST_HH 14
TEST_SWAP 4
XGMT 1

TABLE_NAME TOT_ROWS
------------------------------ ----------
CUSTOMER_DET 1
FOURWHEELER 2
SPOOL_LOG 13
CITYTRANSPORT 8
T1 2
T2 2
A_A 1
B_B 2
AUTOMOBILE 1
XDUAL 1
S_TEMP 0

33 rows selected.

Elapsed: 00:00:00.42
scott@ORCL>

But, this particular script has some limitations on Index-Organized-Table. But, overall this will work brilliantly without even touching all the dba views.

Grouped information in comma-separated values.

scott@ORCL>with tt
2 as
3 (
4 select 1 id, 'saty' name from dual
5 union all
6 select 1, 'anup' from dual
7 union all
8 select 1, 'sas' from dual
9 union all
10 select 2, 'rajib' from dual
11 union all
12 select 2, 'amit' from dual
13 )
14 select id,
15 cast(wmsys.wm_concat(name) as varchar2(100)) src
16 from tt
17 group by id;

ID SRC
---------- ------------------------------------------------------
1 saty,anup,sas
2 rajib,amit

Elapsed: 00:00:01.62
scott@ORCL>
scott@ORCL>

Function wm_concat is undocumented. So, you cannot use it in production environment. Even if you use – you won’t get any technical support from oracle if you have any production issue due to this. So, better not to use this function. But, certainly this reduces lots of our effort and provides a solution using single SQL. I’m still waiting to see it as documented function by Oracle. Till then, you have to go for your custom tailored solution.

The following sqls parse comma-separated values into rows. But, this cannot be applicable into any column of existing tables.

scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll (24, 34, 25));

RES
----------------------------------------
24
34
25

Elapsed: 00:00:00.03
scott@ORCL>
scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll ('A', 'B', 'C'));

RES
----------------------------------------
A
B
C

Elapsed: 00:00:00.05
scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll (24, 'B', '@'));

RES
----------------------------------------
24
B
@

Elapsed: 00:00:00.14
scott@ORCL>

This can handle alpha numeric data type without declaring any custom type for it.

Following SQL is which i prefer most as one of the brilliant features introduced in Oracle and can surprise many developer. Write a select query to retrieve any column information without using the select clause. I know – it sounds crazy. But, you really can do that.

That is what i consider is the leading contender of my Magic SQL category.

scott@ORCL>
scott@ORCL>
scott@ORCL>xquery for $i in ora:view("emp")/ROW/ENAME return $i/text()
2 /

Result Sequence
----------------------------------------------------------------------------
SMITH
ALLEN
WARD
JONES
MARTIN
BLAKE
CLARK
SCOTT
KING
TURNER
ADAMS

Result Sequence
----------------------------------------------------------------------------
JAMES
FORD
MILLER

14 item(s) selected.

Elapsed: 00:00:00.14
scott@ORCL>
scott@ORCL>

Hope you liked this edition.

Regards.