Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/",
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css&#39;]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Calling Twilio Voice API to deliver custom voice calls to the subscriber

Hello Guys!

It’s time to share another installment of fun & exciting posts from the world of Python-verse.

Today, We’ll be leveraging the Twilio voice API to send custom messages through phone calls directly. This service is beneficial on many occasions, including alerting the customer of potential payment reminders to pending product delivery calls to warehouse managers.


Dependent Packages:

Let us explore what packages we need for this –

Dependent Package Installation

The commands for your reference –

pip install twilio
pip install pandas

Also, you need to subscribe/register in Twilio. I’ve already shown you what to do about that. You can refer to my old post to know more about it. However, you need to reserve one phone number from which you will be calling your customers.

Buying phone numbers

As you can see, I’ve reserved one phone number to demonstrate this use case.


Let us explore the key codebase –

  1. clsVoiceAPI.py (Main class invoking the voice API)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 30-Mar-2021 ####
#### Modified On 30-Mar-2021 ####
#### ####
#### Objective: Calling Twilio Voice API ####
##############################################
import json
from clsConfig import clsConfig as cf
import logging
import os
from twilio.rest import Client
class clsVoiceAPI:
def __init__(self):
self.account_sid = cf.conf['TWILIO_ACCOUNT_SID']
self.auth_token = cf.conf['TWILIO_AUTH_TOKEN']
self.from_phone = cf.conf['FROM_PHONE']
self.to_phone = cf.conf['TO_PHONE']
def sendCall(self, msg):
try:
account_sid = self.account_sid
auth_token = self.auth_token
from_phone = self.from_phone
to_phone = self.to_phone
client = Client(account_sid, auth_token)
call = client.calls.create(
twiml='<Response><Say>' + str(msg) + '</Say></Response>',
to=str(from_phone),
from_=str(to_phone)
)
resTokenOutput = call.sid
print('Final Respone: ' + str(resTokenOutput))
resToken = 0
return resToken
except Exception as e:
x = str(e)
resToken = 1
print(x)
logging.info(x)
return resToken

view raw

clsVoiceAPI.py

hosted with ❤ by GitHub

Key snippets from the above codebase –

call = client.calls.create(
                            twiml='<Response><Say>' + str(msg) + '</Say></Response>',
                            to='+18048048844',
                            from_='+19999990396'
                        )

We’re invoking the Twilio API in the above block by giving both the calling & Callee numbers. And, we’re receiving the desired messages from our primary calling program, which the IVR will spell while calling to the customers.

2. callTwilioVoice.py (Main calling script)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Main calling scripts – ####
#### This Python script will consume an ####
#### source API data from Azure-Cloud & publish the ####
#### data into an Oracle Streaming platform, ####
#### which is compatible with Kafka. Later, another ####
#### consumer app will read the data from the stream.####
#########################################################
from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsVoiceAPI as ca
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
# Declared Variable
ret_1 = 0
debug_ind = 'Y'
res_2 = ''
# Defining Generic Log File
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'TwillioAPICall.log', level=logging.INFO)
# Initiating Log Class
l = cl.clsL()
# Moving previous day log files to archive directory
log_dir = cf.conf['LOG_PATH']
tmpR0 = "*" * 157
logging.info(tmpR0)
tmpR9 = 'Start Time: ' + str(var)
logging.info(tmpR9)
logging.info(tmpR0)
print()
print("Log Directory::", log_dir)
tmpR1 = 'Log Directory::' + log_dir
logging.info(tmpR1)
print('Welcome to the Twilio Voice Calling Program: ')
print('*' * 160)
print()
# Provide a short input text for calls
voiceCallText = 'Voice From Satyaki, Welcome to the Python World!'
# Create the instance of the Twilio Voice API Class
x1 = ca.clsVoiceAPI()
# Let's pass this to our map section
resSID = x1.sendCall(voiceCallText)
if resSID == 0:
print('Successfully send Audio Message!')
else:
print('Failed to send Audio Message!')
print()
print('Finished Sending Automated Calls..')
print("*" * 160)
logging.info('FFinished Sending Automated Calls..')
logging.info(tmpR0)
tmpR10 = 'End Time: ' + str(var)
logging.info(tmpR10)
logging.info(tmpR0)
except ValueError as e:
print(str(e))
print("Invalid option!")
logging.info("Invalid option!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()

Key snippets from the above codebase –

        # Create the instance of the Twilio Voice API Class
        x1 = ca.clsVoiceAPI()

        # Let's pass this to our map section
        resSID = x1.sendCall(voiceCallText)

As you can see, we’re first instantiating the class & then calling the method from it by providing the appropriate messages that will eventually deliver to our customer. You can configure dynamic content & pass it to this class.


Let us explore the directory structure –

Directory Structures

Let us see how it runs –

Running Applications

You need to make sure that you are checking your balance of your Twilio account diligently.

Checking Balance

And, here is the sneak peak of how it looks like in an video –

Actual execution

For more information on IVR, please check the following link.


Please find the git details in this link.

So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Another marvelous performance tuning tricks in Python

Hi Guys!

Today, I’ll be showing another post on how one can drastically improve the performance of a python code. Last time, we took advantage of vector computing by using GPU-based computation. This time we’ll explore PyPy (the new just in time compiler, while Python is the interpreter).


What is PyPy?

According to the standard description available over the net ->

PyPy is a very compliant Python interpreter that is a worthy alternative to CPython. By installing and running your application with it, you can gain noticeable speed improvements. How much of an improvement you’ll see depends on the application you’re running.

What is JIT (Just-In Time) compiler?

A compiled programming language always faster in execution as it generates the bytecode based on the CPU architecture & OS. However, they are challenging to port into another system. Example: C, C++ etc.

Interpreted languages are easy to port into a new system. However, they lack performance. Example: Perl, Matlab, etc.

However, python falls between the two. Hence, it performs better than purely interpreted languages. But, indeed not as good as compiler-driven language.

There is a new Just in time compiler comes, which takes advantage of both the world. It identifies the repeatable code & converts those chunks into machine learning code for optimum performance.


To prepare the environment, you need to install the following in MAC (I’m using MacBook) –

brew install pypy3

Let’s revisit our code.

Step 1: largeCompute.py (The main script, which will participate in a performance for both the interpreter):


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 06-May-2021 ####
#### ####
#### Objective: Main calling scripts for ####
#### normal execution. ####
##############################################
from timeit import default_timer as timer
def vecCompute(sizeNum):
try:
total = 0
for i in range(1, sizeNum):
for j in range(1, sizeNum):
total += i + j
return total
except Excception as e:
x = str(e)
print('Error: ', x)
return 0
def main():
start = timer()
totalM = 0
totalM = vecCompute(100000)
print('The result is : ' + str(totalM))
duration = timer() start
print('It took ' + str(duration) +' seconds to compute')
if __name__ == '__main__':
main()

view raw

largeCompute.py

hosted with ❤ by GitHub

Key snippets from the above script –

for i in range(1, sizeNum):
            for j in range(1, sizeNum):
                total += i + j

vecCompute function calculates 100000 * 100000 or any new supplied number to process the value (I = I + J) of each iteration.


Let’s see how it performs.

To run the commands in pypy you need to use the following command –

pypy largeCompute.py

or, You have to mention the specific path as follows –

/Users/satyaki_de/Desktop/pypy3.7-v7.3.4-osx64/bin/pypy largeCompute.py
Performance Comparison between two interpreters

As you can see there is a significant performance improvement i.e. (352.079 / 14.503) = 24.276. So, I can clearly say 24 times faster than using the standard python interpreter. This is as good as C++ code.


Where not to use?

PyPy works best with the pure python-driven applications. It can’t work with the Python or any C extension in python. Hence, you won’t get that benefits. However, I have a strong believe that one day we may use this for most of our use cases.

For more information, please visit this link. So, this is another shortest yet effective post. 🙂


So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Azure-API calls from python-based OCI function through the oracle API-Gateway.

Today, I’ll be discussing Oracle Cloud Function interaction with Azure-API through Oracle API Gateway using native python. Again, I want to touch on this subject as I didn’t find lots of relevant material using python over the net.

Let’s explore our use case. For this use case, I’ll use an old Azure-API that I’ve developed in early 2019 & shared here during that time.

Now, we need to prepare our environment in Oracle-cloud.

Step 1:

We need to configure the virtual network as shown in the below collage picture, which will depict the step-by-step process to create it. For security reasons, I’ve masked sensitive information. It would help if you captured them from your cloud portal.

VCN creation process

Make sure you choose the correct options & validate at the end, as shown in the below picture.

VCN Creation – Final Step

If all the information provided is correct, then you should see the following screen.

VCN Creation – Final Screen

Step 2:

Now, we need to create an application. As per OCI guidelines, one cannot generate any function or group of functions without the container, known as application.

Creation of Application

From the above collage pic, you can see how we create the application by providing all the necessary inputs.

Step 3:

Now, you need to create the registry as shown below –

Creation of Registry

Your function-container will stay inside it after deployment. To know more about this, click the following link.

Step 4:

If you haven’t generated the auth-token already, then this is the time to render it as shown below –

Generation of Auth-Token

Step 5:

This next piece of information is highly crucial & on many occasions, you need this piece of information.

Object storage namespace

Just keep this information handy. I’ll refer to this step whenever we need it. You can get the details here.

Step 6:

Let’s create the gateway now. Please refer to the following collage pics, showing the step-by-step process.

Creation of Gateway

Make sure you have validated it before you proceed to the next step.

Step 7:

Let’s create the function under the application. I find this GUI option is relatively easier than configuring locally & then push it to the OCI. Let’s follow the process shown in the collage of pics mentioned here –

Creation of Function

So, you need to click executing series of commands as shown above. And, the good thing is the majority of the critical pieces of commands are automatically generated for you. So, you don’t need to spend lots of time finding out this information.

Here, we’ll be executing a series of commands as shown below –

Creation of function – continue

Few necessary commands that I want to discuss here –

fn init --runtime python <function-name>

This command will create a template of scripts based on your supplied language. You need to modify the main script (func.py) later, with your appropriate logic. You can add other scripts as class & refer to that class inside your func.py as well.

For a better deployment & control environment, it is always wise to create a virtual env.

Just like the Azure function, you need to update your requirements.txt file before your deployment command.

pip freeze>requirements.txt

Once we are satisfied with our development; we’ll deploy the application as shown below –

Deployment of function

Again, few relevant command that I want to discuss it here –

fn -v deploy --app <Application-Name>

This command will deploy all the oracle functions if they have any changes & push them to the OCI. During this time, it will check all the dependant packages that you are using & tried to install them one-by-one.

If you have already deployed & you want to upgrade your logic, then the deployment option will show something like this –

Deployment of function – continue

All the commands are pretty standard & marked with a red-square box. Few necessary commands to discuss –

fn invoke <Application-Name> <Function-Name>

And if you are not using any external API. Ideally, the above command should return the output with the default value. But, for our case, we have used Azure-API, which is outside the OCI. Hence, we need to update few more settings before it works.

Unlike, Azure-Function, you won’t get the link by default when running them locally using Visual Studio Code editor.

Here, you need to execute the following commands as shown in the above picture –

fn inspect function <Application-Name> <Function-Name>

If your deployment is successful, you will see your function docker-image inside your registry as shown below –

Deployment image of functions

To know more about fn-commands, click the following link.

Step 8:

Now, you need to update some policies, which will help API-Gateway to work.

Update of policy & logging feature

Also, you need to configure your default log for your function, as shown above.

Apart from that, we need to whitelist the port 443 as shown below –

Port whitelisting in VCN

Finally, we need to deploy our existing function into Oracle-Gateway. It would help if you prepared a deployable json object, which will create a channel for the function to interact through the API-gateway deployment.

Deployment of function inside API-Gateway

The deployment json file should looks something like this –

spec.json


{
"routes": [
{
"path": "/getdata",
"methods": [
"GET","POST"
],
"backend": {
"type": "ORACLE_FUNCTIONS_BACKEND",
"functionId": "ocid1.fnfunc.oc1.us-sanjose-1.aaaaxxxxxxxjdjfjfjfjfjfjfjfjfjfjfjfjfjfjfjdsssssss2dfjdfjdjd33376dq"
}
}
]
}

view raw

spec.json

hosted with ❤ by GitHub

You will get more on this from this link.

Make sure that your path prefix should be unique, as shown in the above picture. And, if you want to know the complete steps to prepare your oracle function, you need to go through this master link.

Now, we’re ready to test the application. But, before that, we want to explore the code-base.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsAzureAPI.py ( This is the modified version of old AzureAPI class. We’ve added a new logger, which works inside OCI. No other changes in the man logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
strMsg = 'Input JSON: ' + str(querystring)
logging.getLogger().info(strMsg)
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

3. func.py ( Main calling script. This one auto-genarated by OCI, while creating the functions. We’ve modified it as per our logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Mar-2021 ####
#### Modified On 20-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import io
import json
import logging
from fdk import response
import clsAzureAPI as ca
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def handler(ctx, data: io.BytesIO = None):
try:
email = "default@gmail.com"
# Checking individual elements
try:
body = json.loads(data.getvalue())
email = body.get("email")
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))
logging.getLogger().info("Calling Oracle Python getCovidData function!")
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
resJson = json.loads(retJson)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Success", "message": resJson}),
headers={"Content-Type": "application/json"}
)
except Exception as e:
x = str(e)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Failed", "message": x}),
headers={"Content-Type": "application/json"}
)

view raw

func.py

hosted with ❤ by GitHub

Key snippet that we want to discuss here –

        # Checking individual elements
        try:
            body = json.loads(data.getvalue())
            email = body.get("email")
        except (Exception, ValueError) as ex:
            logging.getLogger().info('error parsing json payload: ' + str(ex))

Checking the individual element in the input payload.

        # Create the instance of the Mock Mulesoft API Class
        x1 = ca.clsAzureAPI()

        # Let's pass this to our map section
        retJson = x1.searchQry()

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        resJson = json.loads(retJson)

Now, we’re calling the azure-API class & receiving the response into a JSON variable.

return response.Response(
            ctx, response_data=json.dumps(
                {"status":"Success", "message": resJson}),
            headers={"Content-Type": "application/json"}
        )

Sending final response to the client.

4. func.yaml ( Main configuration script. This one auto-genarated by OCI, while creating the functions. )


schema_version: 20180708
name: getcoviddata
version: 0.0.1
runtime: python
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256

view raw

func.yaml

hosted with ❤ by GitHub


Let’s run it from postman –

Invoking OCI-Function from Postman

During this demo, I’ve realized that the Oracle function yet to get maturity compared to AWS Lambda or Azure function using python. I almost faced similar challenges, which I faced nearly two years back when I tried to implement Azure function using python. However, I’m optimistic that the Oracle Cloud function will mature itself & share an integrated GUI environment to deploy python-based components straight from the IDE, rather than implementing through a CLI-driven approach. Correct me in case if I missed the IDE, which supports this feature.


You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.

Streaming Data from Azure to Oracle Clouds

Hello guys,

Today, I’m going to discuss a new notion – living between multi-cloud environments; as you might be aware that I’ve shared a dynamic API built inside Azure in my last post. Today, I want to use that covid-19 API, originally from another third-party source & publish it as streams inside the Oracle cloud. These are the ideal case to use integration software like Tibco or Mulesoft. However, this is more useful for any start-up kind of environment or anyone who wants to build their API-based eco-system.

First, you need to register in Oracle cloud as they give a $300 trial to everyone who registers their platform for the first time.

Step 1:

You will lead the main dashboard after successfully registering in the portal by providing the essential information.

Registration to Oracle Cloud

Step 2:

By default, it will show the following screen –

Display of root compartment

It would be best if you created the compartment as shown below.

Creation of sub-compartment

Step 3:

Now, you can create the stream, as shown in the next step, by choosing the desired compartment. You need to click – “Create Stream” blue button on top of your page after selecting the desired compartment.

Creation of stream – Initiation
Creation of Stream – Final Steps

Also, you can test the stream by manually uploading a few sample json shown in the below step.

Lower picture show us the sample Json to test the newly created Stream – Upper picture show us the final Stream with some previously tested JSON

Step 4:

Now, we need to add API-key as follows –

Adding the API-Key

This screen will prompt you to download the private key. We’ll use this in the later configuration of our python environment.

Step 5:

Now, you need to capture the content of the configuration file shown in the below figures –

Copying content of the Configuration file

You’ll get these important details under Identity marked in red-box.

Step 6:

Now, you’ll place the previously acquired private key & the content from Step-5 under the following location from where you are going to trigger the application –

Configuration File & Private Key addition

You will get more details on this from these links –

Now, we’ve finished with the basic Oracle cloud setup.


Let’s check the Azure API one more time using postman –

Testing Azure-API from Postman

Let us install some of the critical python packages –

Installing Key Python-packages

Now, we’re ready with our environment.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsOCIConsume.py (This will consume from the designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 08-Mar-2021 ####
#### ####
#### Objective: Consuming stream from OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIConsume:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_cursor_by_partition(self, client, stream_id, partition):
print("Creating a cursor for partition {}".format(partition))
cursor_details = oci.streaming.models.CreateCursorDetails(
partition=partition,
type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
response = client.create_cursor(stream_id, cursor_details)
cursor = response.data.value
return cursor
def simple_message_loop(self, client, stream_id, initial_cursor):
try:
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=10)
# No messages to process. return.
if not get_response.data:
return
# Process the messages
print(" Read {} messages".format(len(get_response.data)))
for message in get_response.data:
print("{}: {}".format(b64decode(message.key.encode()).decode(),
b64decode(message.value.encode()).decode()))
# get_messages is a throttled method; clients should retrieve sufficiently large message
# batches, as to avoid too many http requests.
time.sleep(1)
# use the next-cursor for iteration
cursor = get_response.headers["opc-next-cursor"]
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def consumeStream(self):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Consuming sream from Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Use a cursor for getting messages; each get_messages call will return a next-cursor for iteration.
# There are a couple kinds of cursors.
# A cursor can be created at a given partition/offset.
# This gives explicit offset management control to the consumer.
print("Starting a simple message loop with a partition cursor")
partition_cursor = self.get_cursor_by_partition(stream_client, s_id, partition="0")
self.simple_message_loop(stream_client, s_id, partition_cursor)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above code –

    def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
        try:

            list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
                                               lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)

            if list_streams.data:
                # If we find an active stream with the correct name, we'll use it.
                print("An active stream {} has been found".format(stream_name))
                sid = list_streams.data[0].id
                return self.get_stream(sac_composite.client, sid)

            print(" No Active stream  {} has been found; Creating it now. ".format(stream_name))
            print(" Creating stream {} with {} partitions.".format(stream_name, partition))

            # Create stream_details object that need to be passed while creating stream.
            stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
                                                                      compartment_id=compartment, retention_in_hours=24)

            # Since stream creation is asynchronous; we need to wait for the stream to become active.
            response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
            return response
        except Exception as e:
            print(str(e))

The above function will check if there is already any existing stream available or not. If not, then it will create one.

    def get_cursor_by_partition(self, client, stream_id, partition):
        print("Creating a cursor for partition {}".format(partition))
        cursor_details = oci.streaming.models.CreateCursorDetails(
            partition=partition,
            type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
        response = client.create_cursor(stream_id, cursor_details)
        cursor = response.data.value
        return cursor

In Oracle cloud, you need to create a cursor to consume the streaming messages. Please refer to the following link for more details.

    def simple_message_loop(self, client, stream_id, initial_cursor):
        try:
            cursor = initial_cursor
            while True:
                get_response = client.get_messages(stream_id, cursor, limit=10)
                # No messages to process. return.
                if not get_response.data:
                    return

                # Process the messages
                print(" Read {} messages".format(len(get_response.data)))
                for message in get_response.data:
                    print("{}: {}".format(b64decode(message.key.encode()).decode(),
                                          b64decode(message.value.encode()).decode()))

                # get_messages is a throttled method; clients should retrieve sufficiently large message
                # batches, as to avoid too many http requests.
                time.sleep(1)
                # use the next-cursor for iteration
                cursor = get_response.headers["opc-next-cursor"]

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In this case, we’re looping through the channel & consuming the messages maintaining fewer HTTP requests in mind.

3. clsOCIPublish.py (This will publish msgs from the source Azure-API to designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Publishing stream at OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIPublish:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def publish_messages(self, client, stream_id, inputDF):
try:
limRec = self.limRec
# Converting dataframe to json
df = inputDF
# Calculating total number of records
cntRow = df.shape[0]
print('Actual Record counts: ', str(cntRow))
print('Supplied Record counts: ', str(limRec))
# Build up a PutMessagesDetails and publish some messages to the stream
message_list = []
start_pos = 0
end_pos = 0
interval = 1
for i in range(limRec):
split_df = p.DataFrame()
rJson = ''
# Preparing Data
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= cntRow):
end_pos = start_pos + interval
else:
end_pos = start_pos + (cntRow start_pos)
split_df = df.iloc[start_pos:end_pos]
rJson = split_df.to_json(orient ='records')
if ((start_pos > cntRow) | (start_pos == cntRow)):
pass
else:
start_pos = start_pos + interval
key = "key" + str(i)
value = "value" + str(rJson)
# End of data preparation
encoded_key = b64encode(key.encode()).decode()
encoded_value = b64encode(value.encode()).decode()
message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))
print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
put_message_result = client.put_messages(stream_id, messages)
# The put_message_result can contain some useful metadata for handling failures
for entry in put_message_result.data.entries:
if entry.error:
print("Error ({}) : {}".format(entry.error, entry.error_message))
else:
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def publishStream(self, inputDf):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Publishing sream to Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Publish some messages to the stream
self.publish_messages(stream_client, s_id, inputDf)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above script –

    def publish_messages(self, client, stream_id, inputDF):
        try:
            limRec = self.limRec
            # Converting dataframe to json
            df = inputDF

            # Calculating total number of records
            cntRow = df.shape[0]
            print('Actual Record counts: ', str(cntRow))
            print('Supplied Record counts: ', str(limRec))

            # Build up a PutMessagesDetails and publish some messages to the stream
            message_list = []
            start_pos = 0
            end_pos = 0
            interval = 1

            for i in range(limRec):
                split_df = p.DataFrame()
                rJson = ''
                # Preparing Data

                # Getting Individual Element & convert them to Series
                if ((start_pos + interval) <= cntRow):
                    end_pos = start_pos + interval
                else:
                    end_pos = start_pos + (cntRow - start_pos)

                split_df = df.iloc[start_pos:end_pos]
                rJson = split_df.to_json(orient ='records')

                if ((start_pos > cntRow) | (start_pos == cntRow)):
                    pass
                else:
                    start_pos = start_pos + interval

                key = "key" + str(i)
                value = "value" + str(rJson)

                # End of data preparation

                encoded_key = b64encode(key.encode()).decode()
                encoded_value = b64encode(value.encode()).decode()
                message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))

            print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
            messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
            put_message_result = client.put_messages(stream_id, messages)

            # The put_message_result can contain some useful metadata for handling failures
            for entry in put_message_result.data.entries:
                if entry.error:
                    print("Error ({}) : {}".format(entry.error, entry.error_message))
                else:
                    print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            return 1

In the above snippet, we’re fetching data captured from our Azure-API call & then send chunk-by-chunk data to the Oracle stream for publishing.

4. clsAzureAPI.py (This will fetch msgs from the source Azure-API)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
print('Input JSON: ', str(querystring))
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

I think this one is pretty straightforward as we’re invoking the Azure-API response.

5. callAzure2OracleStreaming.py (Main calling script to invoke all the class for a end to end test)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Main calling scripts – ####
#### This Python script will consume an ####
#### source API data from Azure-Cloud & publish the ####
#### data into an Oracle Streaming platform, ####
#### which is compatible with Kafka. Later, another ####
#### consumer app will read the data from the stream.####
#########################################################
from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsAzureAPI as ca
import clsOCIPublish as co
import clsOCIConsume as cc
import pandas as p
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
# Declared Variable
ret_1 = 0
debug_ind = 'Y'
res_2 = ''
# Defining Generic Log File
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Azure2OCIStream.log', level=logging.INFO)
# Initiating Log Class
l = cl.clsL()
# Moving previous day log files to archive directory
log_dir = cf.conf['LOG_PATH']
tmpR0 = "*" * 157
logging.info(tmpR0)
tmpR9 = 'Start Time: ' + str(var)
logging.info(tmpR9)
logging.info(tmpR0)
print()
print("Log Directory::", log_dir)
tmpR1 = 'Log Directory::' + log_dir
logging.info(tmpR1)
print('Welcome to Azure to Oracle Cloud Streaming(OCI) Calling Program: ')
print('*' * 160)
print('Reading dynamic Covid data from Azure API: ')
print('https://xxxxxx.yyyyyyyyyy.net/api/getDynamicCovidStats&#39;)
print()
print('Selected Columns for this -> date, state, positive, negative')
print()
print('This will take few seconds depending upon the volume & network!')
print('-' * 160)
print()
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
#res_1 = json.dumps(retJson)
#res = json.loads(res_1)
res = json.loads(retJson)
# Converting dictionary to Pandas Dataframe
# df_ret = p.read_json(ret_2, orient='records')
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[1])
# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]
print()
print()
print("-" * 160)
print('Publishing Azure sample result: ')
print(df_ret.head())
# Logging Final Output
l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')
print("-" * 160)
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Publisher Program!')
print('Pushing Azure API to Oracle Kafka-Streaming using OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x2 = co.clsOCIPublish()
retVal = x2.publishStream(df_ret)
if retVal == 0:
print('Successfully streamed to Oracle Cloud!')
else:
print('Failed to stream!')
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Consumer Program!')
print('Getting Oracle Streaming captured in OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x3 = cc.clsOCIConsume()
retVal2 = x3.consumeStream()
if retVal2 == 0:
print('Successfully streamed captured from Oracle Cloud!')
else:
print('Failed to retrieve stream from OCI!')
print('Finished Analysis points..')
print("*" * 160)
logging.info('Finished Analysis points..')
logging.info(tmpR0)
tmpR10 = 'End Time: ' + str(var)
logging.info(tmpR10)
logging.info(tmpR0)
except ValueError as e:
print(str(e))
print("Invalid option!")
logging.info("Invalid option!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()

This one is the primary calling script, invoking all the Python classes one-by-one to run our test cases together.


Let us run our application –

Running end to end application

And, you can see the streaming data inside Oracle cloud as shown below –

Streaming data

You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.

Creating a dynamic response of an API/Microservice

Hello Guys!

Today, I’m going to discuss a potential use case, where on many occasions, different teams need almost similar kinds of data through API. However, they are not identical. Creating a fresh API/Microservice after following-up with many processes will take significant time.

What if we can create an API in such a way so that we can get the response dynamically without needing to make another one. In this post, we’ll be demonstrating a similar approach.

I’ll be using open-source Covid-API, which will be useful for several posts starting from this one.

You will get plenty of useful data from here.

We’ve chosen the following one for our use case –

API-Reference

Let’s explore the sample data first.

[
   {
      "date":20210207,
      "state":"AK",
      "positive":53279.0,
      "probableCases":null,
      "negative":null,
      "pending":null,
      "totalTestResultsSource":"totalTestsViral",
      "totalTestResults":1536911.0,
      "hospitalizedCurrently":44.0,
      "hospitalizedCumulative":1219.0,
      "inIcuCurrently":null,
      "inIcuCumulative":null,
      "onVentilatorCurrently":11.0,
      "onVentilatorCumulative":null,
      "recovered":null,
      "dataQualityGrade":"A",
      "lastUpdateEt":"2\/5\/2021 03:59",
      "dateModified":"2021-02-05T03:59:00Z",
      "checkTimeEt":"02\/04 22:59",
      "death":279.0,
      "hospitalized":1219.0,
      "dateChecked":"2021-02-05T03:59:00Z",
      "totalTestsViral":1536911.0,
      "positiveTestsViral":64404.0,
      "negativeTestsViral":1470760.0,
      "positiveCasesViral":null,
      "deathConfirmed":null,
      "deathProbable":null,
      "totalTestEncountersViral":null,
      "totalTestsPeopleViral":null,
      "totalTestsAntibody":null,
      "positiveTestsAntibody":null,
      "negativeTestsAntibody":null,
      "totalTestsPeopleAntibody":null,
      "positiveTestsPeopleAntibody":null,
      "negativeTestsPeopleAntibody":null,
      "totalTestsPeopleAntigen":null,
      "positiveTestsPeopleAntigen":null,
      "totalTestsAntigen":null,
      "positiveTestsAntigen":null,
      "fips":"02",
      "positiveIncrease":0,
      "negativeIncrease":0,
      "total":53279,
      "totalTestResultsIncrease":0,
      "posNeg":53279,
      "deathIncrease":0,
      "hospitalizedIncrease":0,
      "hash":"07a5d43f958541e9cdabb5ea34c8fb481835e130",
      "commercialScore":0,
      "negativeRegularScore":0,
      "negativeScore":0,
      "positiveScore":0,
      "score":0,
      "grade":""
   }
]

Let’s take two cases. One, where one service might need to access all the elements, there might be another, where some other service requires specific details.

Let’s explore the code base first –

  1. init.py ( This native Python-based azure-function that will consume streaming data & dynamic API response. )
###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 06-Feb-2021           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: Main Calling scripts.  ####
####                                   ####
#### However, to meet the functionality####
#### we've enhanced as per our logic.  ####
###########################################

import logging
import json
import requests
import os
import pandas as p
import numpy as np

import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Dynamic-Covid-Status HTTP trigger function processed a request.')

    try:

        # Application Variable
        url = os.environ['URL']
        appType = os.environ['appType']
        conType = os.environ['conType']

        # API-Configuration
        payload={}
        headers = {
            "Connection": conType,
            "Content-Type": appType
        }

        # Validating input parameters
        typeSel = req.params.get('typeSel')
        if not typeSel:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeSel = req_body.get('typeSel')
        
        typeVal = req.params.get('typeVal')
        if not typeVal:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeVal = req_body.get('typeVal')

        # Printing Key-Element Values
        str1 = 'typeSel: ' + str(typeSel)
        logging.info(str1)

        str2 = 'typeVal: ' + str(typeVal)
        logging.info(str2)

        # End of API-Inputs

        # Getting Covid data from the REST-API
        response = requests.request("GET", url, headers=headers, data=payload)
        ResJson  = response.text

        if typeSel == '*':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                rJson = df_ret.to_json(orient ='records') 

                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for all values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        elif typeSel == 'Cols':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                # Fetching for the selected columns
                # Extracting the columns from the list
                lstHead = []

                listX = typeVal.split (",")

                for i in listX:
                    lstHead.append(str(i).strip())

                str3 = 'Main List: ' + str(lstHead)
                logging.info(str3)

                slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
                rJson = slice_df.to_json(orient ='records') 
                
                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for selected values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        else:
            x_stat = 'Failed'
            x_msg = 'Important information is missing for typeSel!'

            rJson = {
                "status": x_stat,
                "details": x_msg
            }

            xval = json.dumps(rJson)
            return func.HttpResponse(xval, status_code=200)
    except Exception as e:
        x_msg = str(e)
        x_stat = 'Failed'

        rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

        xval = json.dumps(rJson)
        return func.HttpResponse(xval, status_code=200)

And, Inside the azure portal it looks like –

Dynamic Function inside the Azure portal

Let’s explain the key snippet –

jdata = json.loads(ResJson)

df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

rJson = df_ret.to_json(orient ='records') 

return func.HttpResponse(rJson, status_code=200)

In the above lines, we’re converting the response & organizing it to a pandas dataframe before converting the response to JSON.

# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []

listX = typeVal.split (",")

for i in listX:
    lstHead.append(str(i).strip())

str3 = 'Main List: ' + str(lstHead)
logging.info(str3)

#slice_df = df_ret[df_ret.columns.intersection(lstHead)]
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]

For the second case, the above additional logic will play a significant part. Based on the supplied input in the typeVal attribute, this time, the new response will display accordingly.

Let’s see how it looks –

Azure function in Visual Studio Code
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">Let's test it using Postman -Let’s test it using Postman –

Case 1 (For all the columns):

For all elements

And, the formatted output is as follows –

Formatted output for all elements

Case 2 (For selected columns):

For selected elements
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">And, the formatted output is as follows -And, the formatted output is as follows –
Formatted output of Selected element case

You can find the code in the Github using the following link.


So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Developing native iOS online check-in App & integrating with API using Python

Hi Guys!

Today, I’ll be referring to two of my old posts & enhance one of them to create a basic online check-in app by integrating with Python. The best thing is – “All the components that will be in use already built using native Python code.”

And, probably, this will be the perfect tribute to growing Python fan-followers, who make this thing a reality.

We’ll be using the following API, which I shared in my previous post.

However, note that – to create any API environment better, one needs to use event-hub-based design & publish & consume events from there, instead of directly calling one microservice/API from another.

We need to use the following packages –

pip install briefcase

pip install requests

Apart from it you need to use JSON library as well.

The following steps are required to create this online check-in application –

STEP -1:

mkdir online_checkin

cd online_checkin

STEP -2:

python3 -m venv env

source env/bin/activate

STEP -3:

You need to install the desired packages mentioned above.

It should look something like –

Setting-up the environment

STEP -4:

Now, we have to execute the following command to initiate the project –

briefcase new

This will prompt to fill-up a set of inputs to configure the project as shown in the below screenshot –

Creation of a Mobile-App Project
Creation of Mobile-App Project – Continuation

To check whether all the settings correctly captured or not, one can issue the following command –

briefcase dev
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">If all the settings are correct, then a blank canvas iOS app will launch using the native Python code, which should look -If all the settings are correct, then a blank canvas iOS app will launch using the native Python code, which should look – <p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">
Empty iOS App launch

The above command will generate a series of directories & template python scripts.

Now, we are going to modify the app.py generated as part of the initial project creation.

  1. app.py ( This iOS app will invoke online check-in API to receive the inputs & confirm the status based on the inputs selected by the passengers. )
################################################
####                                        ####
#### Written By: SATYAKI DE                 ####
#### Written On: 24-Nov-2020                ####
#### Briefcase, Toga, json, requests needs  ####
#### to install to run this package.        ####
####                                        ####
#### Objective: This script will create a   ####
#### native I/OS App using native Python.   ####
####                                        ####
################################################

"""
Calling Azure Microservice from Native Mobile App
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW
import requests
import json

class online_checkin(toga.App):

    def startup(self):
        """
        Construct and show the Toga application.

        Usually, you would add your application to a main content box.
        We then create a main window (with a name matching the app), and
        show the main window.
        """
        main_box = toga.Box(style=Pack(direction=COLUMN))

        # Adding Individual Layout details
        name_label = toga.Label("Full Name", style=Pack(padding=(0, 5)))
        self.name_input = toga.TextInput(style=Pack(flex=1))

        name_box = toga.Box(style=Pack(direction=ROW, padding=5))
        name_box.add(name_label)
        name_box.add(self.name_input)

        mobile_label = toga.Label("Mobile", style=Pack(padding=(0, 5)))
        self.mobile_input = toga.TextInput(style=Pack(flex=1))

        mobile_box = toga.Box(style=Pack(direction=ROW, padding=5))
        mobile_box.add(mobile_label)
        mobile_box.add(self.mobile_input)

        email_label = toga.Label("Email", style=Pack(padding=(0, 5)))
        self.email_input = toga.TextInput(style=Pack(flex=1))

        email_box = toga.Box(style=Pack(direction=ROW, padding=5))
        email_box.add(email_label)
        email_box.add(self.email_input)

        source_label = toga.Label("Source Airport", style=Pack(padding=(0, 5)))
        self.source_input = toga.TextInput(style=Pack(flex=1))

        source_box = toga.Box(style=Pack(direction=ROW, padding=5))
        source_box.add(source_label)
        source_box.add(self.source_input)

        destination_label = toga.Label("Destination Airport", style=Pack(padding=(0, 5)))
        self.destination_input = toga.TextInput(style=Pack(flex=1))

        destination_box = toga.Box(style=Pack(direction=ROW, padding=5))
        destination_box.add(destination_label)
        destination_box.add(self.destination_input)

        boardingclass_label = toga.Label("Boarding Class", style=Pack(padding=(0, 5)))
        self.boardingclass_input = toga.TextInput(style=Pack(flex=1))

        boardingclass_box = toga.Box(style=Pack(direction=ROW, padding=5))
        boardingclass_box.add(boardingclass_label)
        boardingclass_box.add(self.boardingclass_input)

        preferredSeatNo_label = toga.Label("Preferred Seat", style=Pack(padding=(0, 5)))
        self.preferredSeatNo_input = toga.TextInput(style=Pack(flex=1))

        preferredSeatNo_box = toga.Box(style=Pack(direction=ROW, padding=5))
        preferredSeatNo_box.add(preferredSeatNo_label)
        preferredSeatNo_box.add(self.preferredSeatNo_input)

        mealBreakfast_label = toga.Label("Breakfast Choice", style=Pack(padding=(0, 5)))
        self.mealBreakfast_input = toga.TextInput(style=Pack(flex=1))

        mealBreakfast_box = toga.Box(style=Pack(direction=ROW, padding=5))
        mealBreakfast_box.add(mealBreakfast_label)
        mealBreakfast_box.add(self.mealBreakfast_input)

        mealLunch_label = toga.Label("Lunch Choice", style=Pack(padding=(0, 5)))
        self.mealLunch_input = toga.TextInput(style=Pack(flex=1))

        mealLunch_box = toga.Box(style=Pack(direction=ROW, padding=5))
        mealLunch_box.add(mealLunch_label)
        mealLunch_box.add(self.mealLunch_input)

        mealDinner_label = toga.Label("Dinner Choice", style=Pack(padding=(0, 5)))
        self.mealDinner_input = toga.TextInput(style=Pack(flex=1))

        mealDinner_box = toga.Box(style=Pack(direction=ROW, padding=5))
        mealDinner_box.add(mealDinner_label)
        mealDinner_box.add(self.mealDinner_input)

        passPort_label = toga.Label("Passport Details", style=Pack(padding=(0, 5)))
        self.passPort_input = toga.TextInput(style=Pack(flex=1))

        passPort_box = toga.Box(style=Pack(direction=ROW, padding=5))
        passPort_box.add(passPort_label)
        passPort_box.add(self.passPort_input)

        localAddress_label = toga.Label("Local Address", style=Pack(padding=(0, 5)))
        self.localAddress_input = toga.TextInput(style=Pack(flex=1))

        localAddress_box = toga.Box(style=Pack(direction=ROW, padding=5))
        localAddress_box.add(localAddress_label)
        localAddress_box.add(self.localAddress_input)

        bookingNo_label = toga.Label("Confirmed Booking No", style=Pack(padding=(0, 5)))
        self.bookingNo_input = toga.TextInput(style=Pack(flex=1))

        bookingNo_box = toga.Box(style=Pack(direction=ROW, padding=5))
        bookingNo_box.add(bookingNo_label)
        bookingNo_box.add(self.bookingNo_input)

        # End Of Layout details

        boardingStatus_box_label = toga.Label("Boarding Status", style=Pack(padding=(0, 5)))
        self.boardingStatus_box_input = toga.MultilineTextInput(readonly=True, style=Pack(flex=1))
        boardingStatus_box = toga.Box(style=Pack(direction=ROW, padding=5))
        boardingStatus_box.add(boardingStatus_box_label)
        boardingStatus_box.add(self.boardingStatus_box_input)

        button = toga.Button("On-Board Now", on_press=self.onBoarding, style=Pack(padding=5))

        # Adding all the visual layout in the main frame

        main_box.add(name_box)
        main_box.add(mobile_box)
        main_box.add(email_box)
        main_box.add(source_box)
        main_box.add(destination_box)
        main_box.add(boardingclass_box)
        main_box.add(preferredSeatNo_box)
        main_box.add(mealBreakfast_box)
        main_box.add(mealLunch_box)
        main_box.add(mealDinner_box)
        main_box.add(passPort_box)
        main_box.add(localAddress_box)
        main_box.add(bookingNo_box)
        main_box.add(button)
        main_box.add(boardingStatus_box)

        # End Of Main Frame

        self.main_window = toga.MainWindow(title=self.formal_name)
        self.main_window.content = main_box
        self.main_window.show()

    def onBoarding(self, widget):

        BASE_URL = "https://xxxxxxxxxx.yyyyyyyy.net/api/getOnBoarding"
        openmapapi_cache = "no-cache"
        openmapapi_con = "keep-alive"
        type = "application/json"

        querystring = { "sourceLeg":  self.source_input.value, "destinationLeg":  self.destination_input.value,
                        "boardingClass":  self.boardingclass_input.value, "preferredSeatNo":  self.preferredSeatNo_input.value,
                        "travelPassport":  self.passPort_input.value , "bookingNo":  self.bookingNo_input.value,
                        "travelerEmail":  self.email_input.value, "travelerMobile": self.mobile_input.value,
                        "mealBreakFast":  self.mealBreakfast_input.value , "mealLunch":  self.mealLunch_input.value,
                        "mealDinner":  self.mealDinner_input.value, "localAddress":  self.localAddress_input.value }

        payload = json.dumps(querystring)

        print('Input Payload: ')
        print(payload)

        headers = {
            'content-type': type,
            'Cache-Control': openmapapi_cache,
            'Connection': openmapapi_con
        }

        response = requests.request("POST", BASE_URL, headers=headers, data=payload)
        ResJson = response.text

        #jdata = json.dumps(ResJson)
        resp = json.loads(ResJson)

        print('Response JSON:')
        print(resp)

        details = resp["description"].strip()
        status = str(resp["status"]).strip()
        sourceAirport = str(resp["sourceLeg"]).strip()
        destinationAirport = str(resp["destinationLeg"]).strip()
        boardingClass = str(resp["boardingClass"]).strip()
        confirmedSeat = str(resp["confirmedSeatNo"]).strip()

        try:
            self.boardingStatus_box_input.value = f'Please find the update on your itenary -> Source - {sourceAirport} to Destination - {destinationAirport} - ' \
                                                  f'Boarding Class - {boardingClass} - Confirmed Seat - {confirmedSeat} and ' \
                                                  f'the summary is as follows  - {details} - Status - {status}'
        except ValueError:
            self.boardingStatus_box_input.value = "Some technical issue occured. We are working on it."

def main():
    return online_checkin()

Let us explore the key snippet –

        name_label = toga.Label("Full Name", style=Pack(padding=(0, 5)))
        self.name_input = toga.TextInput(style=Pack(flex=1))

        name_box = toga.Box(style=Pack(direction=ROW, padding=5))
        name_box.add(name_label)
        name_box.add(self.name_input)

From the above snippet, the program is building the input textbox on the iOS frame for the user input. This is more of a designing point of view. Other languages have sophisticated UI, which can generate similar kind of codes in the background. Unfortunately, Python still needs to grow in that aspect. However, if you think about the journey – it has progressed a lot.

        # Adding all the visual layout in the main frame

        main_box.add(name_box)

The above line finally stitches the widgets into the visual panel of the iOS app.

boardingStatus_box_label = toga.Label("Boarding Status", style=Pack(padding=(0, 5)))
        self.boardingStatus_box_input = toga.MultilineTextInput(readonly=True, style=Pack(flex=1))
        boardingStatus_box = toga.Box(style=Pack(direction=ROW, padding=5))
        boardingStatus_box.add(boardingStatus_box_label)
        boardingStatus_box.add(self.boardingStatus_box_input)

        button = toga.Button("On-Board Now", on_press=self.onBoarding, style=Pack(padding=5))

The above code first designs the button on the app & then it is tied with the “onBoarding” methods, which will invoke the API that we’ve already developed.

    def onBoarding(self, widget):

        BASE_URL = "https://xxxxxxxxxx.yyyyyyyy.net/api/getOnBoarding"
        openmapapi_cache = "no-cache"
        openmapapi_con = "keep-alive"
        type = "application/json"

        querystring = { "sourceLeg":  self.source_input.value, "destinationLeg":  self.destination_input.value,
                        "boardingClass":  self.boardingclass_input.value, "preferredSeatNo":  self.preferredSeatNo_input.value,
                        "travelPassport":  self.passPort_input.value , "bookingNo":  self.bookingNo_input.value,
                        "travelerEmail":  self.email_input.value, "travelerMobile": self.mobile_input.value,
                        "mealBreakFast":  self.mealBreakfast_input.value , "mealLunch":  self.mealLunch_input.value,
                        "mealDinner":  self.mealDinner_input.value, "localAddress":  self.localAddress_input.value }

        payload = json.dumps(querystring)

The above few lines framing an input JSON for our API & then invoke it & finally receive & parse the JSON to display the appropriate message on the iOS app.

The following image will show you the directory structure & also how the code should look. This will help you to map & understand the overall project.

Project Details

Let’s test our API –

Testing API through Postman

Now, we’re ready to test our iOS app.

You need to run the application locally as shown in the below image –

Running local iOS app using Native Python

As you can see that it is successfully able to fetch the API response & then parse it & populate the Boarding Status box marked in “Blue“.

Let us create the package for distribution by using the following command –

briefcase create

This will prompt a series of standard execution as shown below –

Building the package

Finally, you need to execute the following command to build the app –

briefcase build
Building the executed platform on the target OS

To find out whether your application ran successfully or not, one can use the following command –

briefcase run

This time it will run the application from this package & not from you local.

Now, the following commands help you to create & upload the package to iOS App Store.

briefcase create iOS

briefcase build iOS

You can refer the following official-site fore more information on creating the iOS package using briefcase.

Also, I find this is extremely useful for troubleshooting any issues here in this link.

So, finally, we have done it.

You will get the entire codebase from the following Github link.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Creating a mock API using Mulesoft RAML & testing it using Python

Hi Guys,

Today, I’ll be using a popular tool known as Mulesoft to generate a mock API & then we’ll be testing the same using python. Mulesoft is an excellent tool to rapidly develop API & also can integrate multiple cloud environments as an Integration platform. You can use their Anypoint platform to quickly design such APIs for your organization. You can find the details in the following link. However, considering the cost, many organization has to devise their own product or tool to do the same. That’s where developing a Python or Node.js or C# comes adequately considering the cloud platform.

Before we start, let us quickly know what Mock API is?

A mock API server imitates a real API server by providing realistic responses to requests. They can be on your local machine or the public Internet. Responses can be static or dynamic, and simulate the data the real API would return, matching the schema with data types, objects, and arrays.

And why do we need that?

A mock API server is useful during development and testing when live data is either unavailable or unreliable. While designing an API, you can use mock APIs to work concurrently on the front and back-end, as well as to gather feedback from developers. Our mock API sever guide for testing covers how you can use a mock API server so the absence of a real API doesn’t hold you back.

Often with internal projects, the API consumer (such as a front end developer through REST APIs) moves faster than the backend team building the API. This API mocking guide shows how a mock API server allows developers to consume a working API with the same interface as the eventual production API. As an added benefit, the backend team can discover where the mock API doesn’t meet the developer’s needs without spending developer time on features that may be removed or changed. This fast feedback loop can make engineering teams much more efficient.

If you need more information on this topic, you can refer to the following link.

Great! Since now we have a background of mock API – let’s explore how Mulesoft can help us here?

Mulesoft used the “RESTful API Modeling Language (RAML)” language. We’ll be using this language to develop our mock API. To know more about this, you can view the following link.

Under the developer section, you can find Tutorials as shown in the screenshot given below –

18. Type Of RAML

You can select any of the categories & learn basic scripting from it.

Now, let’s take a look at the process of creating a Mulesoft free account to test our theories.

Step 1:

Click the following link, and you will see the page as shown below –

0.1. Mulesoft Landing Page

Step 2:

Now, click the login shown in the RED square. You will see the following page –

0.2. Mulesoft Sign-Up Option

Step 3:

Please provide your credentials if you already have an account. Else, you have to click the “Sign-Up” & then you will need to provide the few details as shown below –

1. Mulesoft Registration

Step 4:

Once, you successfully create the account, you will see the following page –

2. Mulesoft Interface

So, now we are set. To design an API, you will need to click the design center as marked within the white square.

Once you click the “Start designing” button, this will land into the next screen.

21. Creating a Projects

As shown above, you need to click the “Create new” for fresh API design.

This will prompt you the next screen –

22. Creating a Projects - Continue

Now, you need to create the – “Create API specification” as marked in the RED square box. And, that will prompt you the following screen –

23. Creating a Projects - Continue

You have to provide a meaningful name of our API & you can choose either Text or Visual editor. For this task, we’ll be selecting the Text Editor. And we’ll select RAML 1.0 as our preferred language. Once, we provide all the relevant information, the “Create Specification” button marked in Green will be activated. And then you need to click it. It will lead you to the next screen –