Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installing Packages

Step – 2:

Installing Packages – Continue

Step – 3:

Installing Packages – Continue

Step – 4:

Installing Packages – Final

And, here is the command to install those packages –

pip install pandas
pip install plotly
pip install prophet
pip install dash
pip install pandas
pip install ably

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py ( This native Python script contains the configuration entries. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 09-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'XXX2LL.93kdkiU2:Kdsldoeie737484E',
"URL":"https://corona-api.com/countries/",
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID & FinData.

2. clsPublishStream.py ( This script will publish the data transformed for Covid-19 predictions from the third-party sources. )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.fnc = cf.conf['FNC']
def pushEvents(self, srcDF, debugInd, varVa, flg):
try:
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
{'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Capturing the inbound dataframe
iDF = srcDF
# Adding new selected points
covid_dict = iDF.to_dict('records')
jdata_fin = json.dumps(covid_dict)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’ve already discussed this script. The only new line that appears here is –

json_data = [{'Year_Mon': '201911', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg},
            {'Year_Mon': '201912', 'Brazil': 0.0, 'Canada': 0.0, 'Germany': 0.0, 'India': 0.0, 'Indonesia': 0.0, 'UnitedKingdom': 0.0, 'UnitedStates': 0.0, 'Status': flg}]

This statement is more like a dummy feed, which creates the basic structure of your graph.

3. clsStreamConsume.py ( This script will consume the stream from Ably Queue configuration entries. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### callPredictCovidAnalysisRealtime.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print(x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’ve already discussed this script in one of my earlier posts, which you will get here.

So, I’m not going to discuss all the steps in detail.

The only added part was to introduce some temporary local caching mechanism.

if df_conv.empty:
    df_conv = p.read_csv(fileName, index = True)
else:
    l.logr(fileName, debug_ind, df_conv, 'log')

4. callPredictCovidAnalysisRealtime.py ( Main calling script to fetch the COVID-19 data from the third-party source & then publish it to the Ably message queue after transforming the data & adding the prediction using Facebook’s prophet API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import math as m
import clsPublishStream as cps
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
import datetime as dt
import time
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'UnitedKingdom'
elif str(countryCD) == 'US':
cntCD = 'UnitedStates'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def lookupCountry(row):
try:
strCD = str(row['CountryCode'])
retVal = countryDet(strCD)
return retVal
except:
retVal = 'N/A'
return retVal
def adjustTrend(row):
try:
flTrend = float(row['trend'])
flTrendUpr = float(row['trend_upper'])
flTrendLwr = float(row['trend_lower'])
retVal = m.trunc((flTrend + flTrendUpr + flTrendLwr)/3)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def ceilTrend(row, colName):
try:
flTrend = str(row[colName])
if flTrend.find('.'):
if float(flTrend) > 0:
retVal = m.trunc(float(flTrend)) + 1
else:
retVal = m.trunc(float(flTrend))
else:
retVal = float(flTrend)
if retVal < 0:
retVal = 0
return retVal
except:
retVal = 0
return retVal
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
#m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.04525)
#m = Prophet(n_changepoints=['2021-09-10'])
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'trend', 'trend_lower', 'trend_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
df_M['Country'] = cntCD
l.logr('17.df_M_C_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
df_M['AdjustTrend'] = df_M.apply(lambda row: adjustTrend(row), axis=1)
l.logr('20.df_M_AdjustTrend_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def toNum(row, colName):
try:
flTrend = str(row[colName])
flTr, subpart = flTrend.split(' ')
retVal = int(flTr.replace('-',''))
return retVal
except:
retVal = 0
return retVal
def extractPredictedDF(OrigDF, MergePredictedDF, colName):
try:
iDF_1 = OrigDF
iDF_2 = MergePredictedDF
dt_format = '%Y-%m-%d'
iDF_1_max_group = iDF_1.groupby(["Country"] , as_index=False)["ReportedDate"].max()
iDF_2['ReportedDate'] = iDF_2.apply(lambda row: toNum(row, 'ds'), axis=1)
col_one_list = iDF_1_max_group['Country'].tolist()
col_two_list = iDF_1_max_group['ReportedDate'].tolist()
print('col_one_list: ', str(col_one_list))
print('col_two_list: ', str(col_two_list))
cnt_1_x = 1
cnt_1_y = 1
cnt_x = 0
df_M = p.DataFrame()
for i in col_one_list:
str_countryVal = str(i)
cnt_1_y = 1
for j in col_two_list:
intReportDate = int(str(j).strip().replace('-',''))
if cnt_1_x == cnt_1_y:
print('str_countryVal: ', str(str_countryVal))
print('intReportDate: ', str(intReportDate))
iDF_2_M = iDF_2[(iDF_2['Country'] == str_countryVal) & (iDF_2['ReportedDate'] > intReportDate)]
# Merging with the previous Country Code data
if cnt_x == 0:
df_M = iDF_2_M
else:
d_frames = [df_M, iDF_2_M]
df_M = p.concat(d_frames)
cnt_x += 1
cnt_1_y += 1
cnt_1_x += 1
df_M.drop(columns=['ReportedDate'], axis=1, inplace=True)
df_M.rename(columns={'ds':'ReportedDate'}, inplace=True)
df_M.rename(columns={'AdjustTrend':colName}, inplace=True)
return df_M
except:
df = p.DataFrame()
return df
def toPivot(inDF, colName):
try:
iDF = inDF
iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
iDF_Piv.reset_index( drop=False, inplace=True )
list1 = ['ReportedDate']
iDF_Arr = iDF['Country'].unique()
list2 = iDF_Arr.tolist()
listV = list1 + list2
iDF_Piv.reindex([listV], axis=1)
return iDF_Piv
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def toAgg(inDF, var, debugInd, flg):
try:
iDF = inDF
colName = "ReportedDate"
list1 = list(iDF.columns.values)
list1.remove(colName)
list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]
iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
iDF.drop(columns=[colName], axis=1, inplace=True)
ColNameGrp = "Year_Mon"
print('List1 Aggregate:: ', str(list1))
print('ColNameGrp :: ', str(ColNameGrp))
iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
iDF_T.fillna(0, inplace = True)
print('iDF_T:: ')
print(iDF_T)
iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
iDF_1_max_group['Status'] = flg
return iDF_1_max_group
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
def publishEvents(inDF1, inDF2, inDF3, inDF4, var, debugInd):
try:
# Original Covid Data from API
iDF1 = inDF1
iDF2 = inDF2
NC = 'NewConfirmed'
ND = 'NewDeaths'
iDF1_PV = toPivot(iDF1, NC)
iDF1_PV['ReportedDate'] = p.to_datetime(iDF1_PV['ReportedDate'])
l.logr('57.iDF1_PV_' + var + '.csv', debugInd, iDF1_PV, 'log')
iDF2_PV = toPivot(iDF2, ND)
iDF2_PV['ReportedDate'] = p.to_datetime(iDF2_PV['ReportedDate'])
l.logr('58.iDF2_PV_' + var + '.csv', debugInd, iDF2_PV, 'log')
# Predicted Covid Data from Facebook API
iDF3 = inDF3
iDF4 = inDF4
iDF3_PV = toPivot(iDF3, NC)
l.logr('59.iDF3_PV_' + var + '.csv', debugInd, iDF3_PV, 'log')
iDF4_PV = toPivot(iDF4, ND)
l.logr('60.iDF4_PV_' + var + '.csv', debugInd, iDF4_PV, 'log')
# Now aggregating data based on year-month only
iDF1_Agg = toAgg(iDF1_PV, var, debugInd, NC)
l.logr('61.iDF1_Agg_' + var + '.csv', debugInd, iDF1_Agg, 'log')
iDF2_Agg = toAgg(iDF2_PV, var, debugInd, ND)
l.logr('62.iDF2_Agg_' + var + '.csv', debugInd, iDF2_Agg, 'log')
iDF3_Agg = toAgg(iDF3_PV, var, debugInd, NC)
l.logr('63.iDF3_Agg_' + var + '.csv', debugInd, iDF3_Agg, 'log')
iDF4_Agg = toAgg(iDF4_PV, var, debugInd, ND)
l.logr('64.iDF4_Agg_' + var + '.csv', debugInd, iDF4_Agg, 'log')
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)
if retVal_1 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
# Pushing both the Historical Death Cases
retVal_3 = x1.pushEvents(iDF2_Agg, debugInd, var, ND)
if retVal_3 == 0:
print('Successfully historical event pushed!')
else:
print('Failed to push historical events!')
time.sleep(5)
# Pushing both the New Confirmed Cases
retVal_2 = x1.pushEvents(iDF3_Agg, debugInd, var, NC)
if retVal_2 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
# Pushing both the New Death Cases
retVal_4 = x1.pushEvents(iDF4_Agg, debugInd, var, ND)
if retVal_4 == 0:
print('Successfully predicted event pushed!')
else:
print('Failed to push predicted events!')
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Due to source data issue, application will perform of
# avg of counts based on dates due to multiple entries
g_df = df.groupby(["CountryCode", "ReportedDate"] , as_index=False)["TotalReportedDead","TotalConfirmedCase","TotalRecovered","NewConfirmed","NewRecovered","NewDeaths","ActiveCaases"].mean()
g_df['TotalReportedDead_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalReportedDead'), axis=1)
g_df['TotalConfirmedCase_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalConfirmedCase'), axis=1)
g_df['TotalRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'TotalRecovered'), axis=1)
g_df['NewConfirmed_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewConfirmed'), axis=1)
g_df['NewRecovered_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewRecovered'), axis=1)
g_df['NewDeaths_M'] = g_df.apply(lambda row: ceilTrend(row, 'NewDeaths'), axis=1)
g_df['ActiveCaases_M'] = g_df.apply(lambda row: ceilTrend(row, 'ActiveCaases'), axis=1)
# Dropping old columns
g_df.drop(columns=['TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases'], axis=1, inplace=True)
# Renaming the new columns to old columns
g_df.rename(columns={'TotalReportedDead_M':'TotalReportedDead'}, inplace=True)
g_df.rename(columns={'TotalConfirmedCase_M':'TotalConfirmedCase'}, inplace=True)
g_df.rename(columns={'TotalRecovered_M':'TotalRecovered'}, inplace=True)
g_df.rename(columns={'NewConfirmed_M':'NewConfirmed'}, inplace=True)
g_df.rename(columns={'NewRecovered_M':'NewRecovered'}, inplace=True)
g_df.rename(columns={'NewDeaths_M':'NewDeaths'}, inplace=True)
g_df.rename(columns={'ActiveCaases_M':'ActiveCaases'}, inplace=True)
l.logr('5.g_df_' + var1 + '.csv', DInd, g_df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
cnt_x = 0
cnt_y = 0
df_M_Confirmed = p.DataFrame()
df_M_Deaths = p.DataFrame()
for i in countryList:
try:
cntryIndiv = i.strip()
cntryFullName = countryDet(cntryIndiv)
print('Country Porcessing: ' + str(cntryFullName))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(g_df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_x == 0:
df_M_Confirmed = a1
else:
d_frames = [df_M_Confirmed, a1]
df_M_Confirmed = p.concat(d_frames)
cnt_x += 1
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
# Merging with the previous Country Code data
if cnt_y == 0:
df_M_Deaths = a2
else:
d_frames = [df_M_Deaths, a2]
df_M_Deaths = p.concat(d_frames)
cnt_y += 1
# Printing Proper message
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
l.logr('49.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('50.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Removing unwanted columns
df_M_Confirmed.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
df_M_Deaths.drop(columns=['trend', 'trend_lower', 'trend_upper'], axis=1, inplace=True)
l.logr('51.df_M_Confirmed_' + var1 + '.csv', DInd, df_M_Confirmed, 'log')
l.logr('52.df_M_Deaths_' + var1 + '.csv', DInd, df_M_Deaths, 'log')
# Creating original dataframe from the source API
df_M_Confirmed_Orig = g_df[['CountryCode', 'ReportedDate','NewConfirmed']]
df_M_Deaths_Orig = g_df[['CountryCode', 'ReportedDate','NewDeaths']]
# Transforming Country Code
df_M_Confirmed_Orig['Country'] = df_M_Confirmed_Orig.apply(lambda row: lookupCountry(row), axis=1)
df_M_Deaths_Orig['Country'] = df_M_Deaths_Orig.apply(lambda row: lookupCountry(row), axis=1)
# Dropping unwanted column
df_M_Confirmed_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
df_M_Deaths_Orig.drop(columns=['CountryCode'], axis=1, inplace=True)
# Reordering columns
df_M_Confirmed_Orig = df_M_Confirmed_Orig.reindex(['ReportedDate','Country','NewConfirmed'], axis=1)
df_M_Deaths_Orig = df_M_Deaths_Orig.reindex(['ReportedDate','Country','NewDeaths'], axis=1)
l.logr('53.df_M_Confirmed_Orig_' + var1 + '.csv', DInd, df_M_Confirmed_Orig, 'log')
l.logr('54.df_M_Deaths_Orig_' + var1 + '.csv', DInd, df_M_Deaths_Orig, 'log')
# Filter out only the predicted data
filterDF_1 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewConfirmed')
l.logr('55.filterDF_1_' + var1 + '.csv', DInd, filterDF_1, 'log')
filterDF_2 = extractPredictedDF(df_M_Confirmed_Orig, df_M_Confirmed, 'NewDeaths')
l.logr('56.filterDF_2_' + var1 + '.csv', DInd, filterDF_2, 'log')
# Calling the final publish events
retVa = publishEvents(df_M_Confirmed_Orig, df_M_Deaths_Orig, filterDF_1, filterDF_2, var1, DInd)
if retVa == 0:
print('Successfully stream processed!')
else:
print('Failed to process stream!')
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
print(x)
if __name__ == "__main__":
main()

Let us understand the enhancement part of this script –

We’ve taken out the plotly part as we will use a separate dashboard script to visualize the data trend.

However, we need to understand the initial consumed data from API & how we transform the data, which will be helpful for visualization.

The initial captured data should look like this after extracting only the relevant elements from the API response.

Initial Data from API

As you can see that based on the country & reported date, our application is consuming attributes like Total-Reported-Death, Total-Recovered, New-death, New-Confirmed & so on.

From this list, we’ve taken two attributes for our use cases & they are New-Death & New-Confirmed. Also, we’re predicting the Future-New-Death & Future-New-Confirmed based on the historical data using Facebook’s prophet API.

And, we would be transposing them & extract the countries & put them as columns for better representations.

Transposed Data

Hence, here is the code that we should be exploring –

def toPivot(inDF, colName):
    try:
        iDF = inDF

        iDF_Piv = iDF.pivot_table(colName, ['ReportedDate'], 'Country')
        iDF_Piv.reset_index( drop=False, inplace=True )

        list1 = ['ReportedDate']

        iDF_Arr = iDF['Country'].unique()
        list2 = iDF_Arr.tolist()

        listV = list1 + list2

        iDF_Piv.reindex([listV], axis=1)

        return iDF_Piv
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

Now, using the pivot_table function, we’re transposing the row values into the columns. And, later we’ve realigned the column heading as per our desired format.

However, we still have the data as per individual daily dates in this case. We want to eliminate that by removing the daypart & then aggregate them by month as shown below –

Aggregated Data

And, here is the code for that –

def toAgg(inDF, var, debugInd, flg):
    try:
        iDF = inDF
        colName = "ReportedDate"

        list1 = list(iDF.columns.values)
        list1.remove(colName)

        list1 = ["Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]

        iDF['Year_Mon'] = iDF[colName].apply(lambda x:x.strftime('%Y%m'))
        iDF.drop(columns=[colName], axis=1, inplace=True)

        ColNameGrp = "Year_Mon"
        print('List1 Aggregate:: ', str(list1))
        print('ColNameGrp :: ', str(ColNameGrp))

        iDF_T = iDF[["Year_Mon", "Brazil", "Canada", "Germany", "India", "Indonesia", "UnitedKingdom", "UnitedStates"]]
        iDF_T.fillna(0, inplace = True)
        print('iDF_T:: ')
        print(iDF_T)

        iDF_1_max_group = iDF_T.groupby(ColNameGrp, as_index=False)[list1].sum()
        iDF_1_max_group['Status'] = flg

        return iDF_1_max_group
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

From the above snippet we can conclude that the application is taking out the daypart & then aggregate it based on the Year_Mon attribute.

The following snippet will push the final transformed data to Ably queue –

x1 = cps.clsPublishStream()

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(iDF1_Agg, debugInd, var, NC)

if retVal_1 == 0:
    print('Successfully historical event pushed!')
else:
    print('Failed to push historical events!')

5. dashboard_realtime.py ( Main calling script to consume the data from Ably queue & then visualize the trend. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: This is the main script ####
#### to invoke dashboard after consuming ####
#### streaming real-time predicted data ####
#### using Facebook API & Ably message Q. ####
#### ####
#### This script will show the trend ####
#### comparison between major democracies ####
#### of the world. ####
#### ####
##############################################
import datetime
import dash
from dash import dcc
from dash import html
import plotly
from dash.dependencies import Input, Output
from ably import AblyRest
from clsConfig import clsConfig as cf
import pandas as p
# Main Class to consume streaming
import clsStreamConsume as ca
import numpy as np
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css&#39;]
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H1("Covid-19 Trend Dashboard",
className='text-center text-primary mb-4'),
html.H5(children='''
Dash: Covid-19 Trend – (Present Vs Future)
'''),
html.P("Covid-19: New Confirmed Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-1'),
html.P("Covid-19: New Death Cases:",
style={"textDecoration": "underline"}),
dcc.Graph(id='live-update-graph-2'),
dcc.Interval(
id='interval-component',
interval=5*1000, # in milliseconds
n_intervals=0
)
], className="row", style={'marginBottom': 10, 'marginTop': 10})
)
def to_OptimizeString(row):
try:
x_str = str(row['Year_Mon'])
dt_format = '%Y%m%d'
finStr = x_str + '01'
strReportDate = datetime.datetime.strptime(finStr, dt_format)
return strReportDate
except Exception as e:
x = str(e)
print(x)
dt_format = '%Y%m%d'
var = '20990101'
strReportDate = datetime.strptime(var, dt_format)
return strReportDate
def fetchEvent(var1, DInd):
try:
# Let's pass this to our map section
iDF_M = x1.conStream(var1, DInd)
# Converting Year_Mon to dates
iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)
# Dropping old columns
iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)
#Renaming new column to old column
iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)
return iDF_M
except Exception as e:
x = str(e)
print(x)
iDF_M = p.DataFrame()
return iDF_M
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-1', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]
# Adding different chart into one dashboard
# First Use Case – New Confirmed
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph-2', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
# Let's pass this to our map section
retDF = fetchEvent(var1, DInd)
# Create the graph with subplots
#fig = plotly.tools.make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.3, horizontal_spacing=0.2)
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.3, horizontal_spacing=0.2)
# Routing data to dedicated DataFrame
retDFND = retDF.loc[(retDF['Status'] == 'NewDeaths')]
# Adding different chart into one dashboard
# Second Use Case – New Confirmed
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Brazil,'type':'bar','name':'Brazil'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Canada,'type':'bar','name':'Canada'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Germany,'type':'bar','name':'Germany'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.India,'type':'bar','name':'India'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.Indonesia,'type':'bar','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedKingdom,'type':'bar','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFND.Year_Mon,'y':retDFND.UnitedStates,'type':'bar','name':'United States'},1,1)
return fig
except Exception as e:
x = str(e)
print(x)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
return fig
if __name__ == '__main__':
app.run_server(debug=True)

Let us explore the critical snippet as this is a brand new script –

external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']

app = dash.Dash(__name__, external_stylesheets=external_stylesheets)

app.layout = html.Div(
    html.Div([
        html.H1("Covid-19 Trend Dashboard",
                        className='text-center text-primary mb-4'),
        html.H5(children='''
            Dash: Covid-19 Trend - (Present Vs Future)
        '''),
        html.P("Covid-19: New Confirmed Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-1'),
        html.P("Covid-19: New Death Cases:",
               style={"textDecoration": "underline"}),
        dcc.Graph(id='live-update-graph-2'),
        dcc.Interval(
            id='interval-component',
            interval=5*1000, # in milliseconds
            n_intervals=0
        )
    ], className="row", style={'marginBottom': 10, 'marginTop': 10})
)

You need to understand the basics of HTML as this framework works seamlessly with it. To know more about the supported HTML, one needs to visit the following link.

def to_OptimizeString(row):
    try:
        x_str = str(row['Year_Mon'])

        dt_format = '%Y%m%d'
        finStr = x_str + '01'

        strReportDate = datetime.datetime.strptime(finStr, dt_format)

        return strReportDate

    except Exception as e:
        x = str(e)
        print(x)

        dt_format = '%Y%m%d'
        var = '20990101'

        strReportDate = datetime.strptime(var, dt_format)

        return strReportDate

The application is converting Year-Month combinations from string to date for better projection.

Also, we’ve implemented a dashboard that will refresh every five milliseconds.

def fetchEvent(var1, DInd):
    try:
        # Let's pass this to our map section
        iDF_M = x1.conStream(var1, DInd)

        # Converting Year_Mon to dates
        iDF_M['Year_Mon_Mod']= iDF_M.apply(lambda row: to_OptimizeString(row), axis=1)

        # Dropping old columns
        iDF_M.drop(columns=['Year_Mon'], axis=1, inplace=True)

        #Renaming new column to old column
        iDF_M.rename(columns={'Year_Mon_Mod':'Year_Mon'}, inplace=True)

        return iDF_M

    except Exception as e:
        x = str(e)
        print(x)

        iDF_M = p.DataFrame()

        return iDF_M

The application will consume all the events from the Ably Queue using the above snippet.

@app.callback(Output('live-update-graph-1', 'figure'),
              Input('interval-component', 'n_intervals'))
def update_graph_live(n):

We’ve implemented the callback mechanism to get the latest data from the Queue & then update the graph accordingly & finally share the updated chart & return that to our method, which is calling it.

# Routing data to dedicated DataFrame
retDFNC = retDF.loc[(retDF['Status'] == 'NewConfirmed')]

Based on the flag, we’re pushing the data into our target dataframe, from where the application will consume the data into the charts.

fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Brazil,'type':'scatter','name':'Brazil'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Canada,'type':'scatter','name':'Canada'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Germany,'type':'scatter','name':'Germany'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.India,'type':'scatter','name':'India'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.Indonesia,'type':'scatter','name':'Indonesia'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedKingdom,'type':'scatter','name':'United Kingdom'},1,1)
fig.append_trace({'x':retDFNC.Year_Mon,'y':retDFNC.UnitedStates,'type':'scatter','name':'United States'},1,1)

Different country’s KPI elements are fetched & mapped into their corresponding axis to project the graph with visual details.

Same approach goes for the other graph as well.


Run:

Let us run the application –

Run – Beginning
Run – Finishing Stage

Dashboard:

Dashboard Job Run
Dashboard Visualization

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Analyzing Language using IBM Watson using Python

Hi Guys,

Today, I’ll be discussing the following topic – “How to analyze text using IBM Watson implementing through Python.”

IBM has significantly improved in the field of Visual Image Analysis or Text language analysis using its IBM Watson cloud platform. In this particular topic, we’ll be exploring the natural languages only.

To access IBM API, we need to first create an IBM Cloud account from this site.

Let us quickly go through the steps to create the IBM Language Understanding service. Click the Catalog on top of your browser menu as shown in the below picture –

6. Creating an Instance for Watson

After that, click the AI option on your left-hand side of the panel marked in RED.

Click the Watson-Studio & later choose the plan. In our case, We’ll select the “Lite” option as IBM provided this platform for all the developers to explore their cloud for free.

7. Choosing AI
8. Choosing Plan

Clicking the create option will lead to a blank page of Watson Studio as shown below –

9. Choosing Watson Studio

And, now, we need to click the Get Started button to launch it. This will lead to Create Project page, which can be done using the following steps –

10. Create Project Initial Screen

Now, clicking the create a project will lead you to the next screen –

11. Create Project - Continue

You can choose either an empty project, or you can create it from a sample file. In this case, we’ll be selecting the first option & this will lead us to the below page –

12. Creating a Project

And, then you will click the “Create” option, which will lead you to the next screen –

13. Adding to project

Now, you need to click “Add to Project.” This will give you a variety of services that you want to explore/use from the list. If you want to create your own natural language classifier, which you can do that as follows –

14. Adding Natural Language Components from IBM Cloud

Once, you click it – you need to select the associate service –

15. Adding Associte Service - Sound

Here, you need to click the hyperlink, which prompts to the next screen –

16. Choosing Associate Service - Sound

You need to check the price for both the Visual & Natural Language Classifier. They are pretty expensive. The visual classifier has the Lite plan. However, it has limitations of output.

Clicking the “Create” will prompt to the next screen –

18. Selecting Region - Sound

After successful creation, you will be redirected to the following page –

19. Landing Page - Sound

Now, We’ll be adding our “Natural Language Understand” for our test –

29. Choosing Natural Language Understanding

This will prompt the next screen –

7. Choosing AI - Natural Language Understanding

Once, it is successful. You will see the service registered as shown below –

3. Watson Services - Sound

If you click the service marked in RED, it will lead you to another page, where you will get the API Key & Url. You need both of this information in Python application to access this API as shown below –

4. Watson API Details - Sound

Now, we’re ready with the necessary cloud set-up. After this, we need to install the Python package for IBM Cloud as shown below –

1. Installing_Packages

We’ve noticed that, recently, IBM has launched one upgraded package. Hence, we installed that one as well. I would recommend you to install this second package directly instead of the first one shown above –

2. Installing Latest IBM_Watson Package

Now, we’re done with our set-up.

Let’s see the directory structure –

31. Directory Structure

We’ll be discussing only the main calling script & class script. However, we’ll be posting the parameters without discussing it. And, we won’t discuss clsL.py as we’ve already discussed that in our previous post.

1. clsConfig.py (This script contains all the parameter details.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### IBM Cloud API.   Application will    ####
#### process these information & perform  ####
#### various analysis on IBM Watson cloud.####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'SERVICE_URL': "https://api.eu-gb.natural-language-understanding.watson.cloud.ibm.com/instances/xxxxxxxxxxxxxxXXXXXXXXXXxxxxxxxxxxxxxxxx",
        'API_KEY': "Xxxxxxxxxxxxxkdkdfifd984djddkkdkdkdsSSdkdkdd",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'IBM Watson Language Understand!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

Note that you will be placing your API_KEY & URL here, as shown in the configuration file.

2. clsIBMWatson.py (This is the main script, which will invoke the IBM Watson API based on the input from the user & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### IBM Watson Language Understand API.  ####
##############################################

import logging
from clsConfig import clsConfig as cf
import clsL as cl
import json
from ibm_watson import NaturalLanguageUnderstandingV1
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_watson.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, SentimentOptions, CategoriesOptions, ConceptsOptions
from ibm_watson import ApiException

class clsIBMWatson:
    def __init__(self):
        self.api_key =  cf.config['API_KEY']
        self.service_url = cf.config['SERVICE_URL']

    def calculateExpressionFromUrl(self, inputUrl, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                url=inputUrl,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

    def calculateExpressionFromText(self, inputText, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                text=inputText,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

Some of the key lines from the above snippet –

authenticator = IAMAuthenticator(api_key)

# Authentication via service credentials provided in our config files
service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
service.set_service_url(service_url)

By providing the API Key & Url, the application is initiating the service for Watson.

response = service.analyze(
    url=inputUrl,
    features=Features(entities=EntitiesOptions(),
                      sentiment=SentimentOptions(),
                      concepts=ConceptsOptions())).get_result()

Based on your type of input, it will bring the features of entities, sentiment & concepts here. Apart from that, you can additionally check the following features as well – Keywords & Categories.

3. callIBMWatsonAPI.py (This is the first calling script. Based on user choice, it will receive input either as Url or as the plain text & then analyze it.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsIBMWatson as cw

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_1 = 0
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'IBMWatson_NaturalLanguageAnalysis.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to IBM Wantson Language Understanding Calling Program: ')
        print('-' * 60)
        print('Please Press 1 for Understand the language from Url.')
        print('Please Press 2 for Understand the language from your input-text.')
        input_choice = int(input('Please provide your choice:'))

        # Create the instance of the IBM Watson Class
        x2 = cw.clsIBMWatson()

        # Let's pass this to our map section
        if input_choice == 1:
            textUrl = str(input('Please provide the complete input url:'))
            ret_1 = x2.calculateExpressionFromUrl(textUrl, curr_ver)
        elif input_choice == 2:
            inputText = str(input('Please provide the input text:'))
            ret_1 = x2.calculateExpressionFromText(inputText, curr_ver)
        else:
            print('Invalid options!')

        if ret_1 == 0:
            print('Successful IBM Watson Language Understanding Generated!')
        else:
            print('Failed to generate IBM Watson Language Understanding!')

        print("-" * 60)
        print()

        print('Finding Analysis points..')
        print("*" * 157)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

This script is pretty straight forward as it is first creating an instance of the main class & then based on the user input, it is calling the respective functions here.

As of now, IBM Watson can work on a list of languages, which are available here.

If you want to start from scratch, please refer to the following link.

Please find the screenshot of our application run –

Case 1 (With Url): 

21. Win_Run_1_Url
23. Win_Run_3_Url

Case 2 (With Plain text):

25. Win_Run_1_InputText
26. Win_Run_2_InputText
27. Win_Run_3_InputText

Now, Don’t forget to delete all the services from your IBM Cloud.

32. Delete Service

As you can see, from the service, you need to delete all the services one-by-one as shown in the figure.

So, we’ve done it.

To explore my photography, you can visit the following link.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Creating a Cross-platform GUI based application using native Python using PyQt5

Hi Guys!

Today, We’ll be discussing one more graphical package in Python, which is also known as PyQt. To faster design the GUI, we’ll be exploring another tool called Qt Designer, which is available for multiple OS platforms.

Please find the QT Designer here.

This is similar to any other GUI based IDE like Microsoft Visual Studio, where you can quickly generate your GUI template.

The majority of the internet post talks about using PyQt5 or PyQt4 packages. But, when speaking about using the .ui file inside your Python code – they either demonstrate fundamental options without any event or, they convert & generate the .ui file into .py file & then they use it. This certainly not making it very useful for many of the developers who are trying to use it for the first time. Hence, My main goal is to use the .ui file inside my Python script as it is & use all the components out of it & assign various working events.

In this post, we’ll discuss only with one script & then we’ll showcase the output in the form of video (No audio). You can verify the output for both MAC & Windows.

Before we start, let us check the directory structure between Windows & MAC –

2. MAC & Win Directory Structure

Let us explore how the GUI should look like ->

3. GUI Design

So, as you can see that this tool is like any other GUI based tool, basically you can create anything by simply drag & drop method.

Before we start discussing our code, here is the sample basicAdv.ui file for your reference.

You need to install the following framework –

pip install PyQt5

1. GUIPyQt5.py (This script contains all the GUI details & it will invoke the instance along with the logic.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Mar-2020              ####
#### Modified On 12-Mar-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from PyQt5 import QtWidgets, uic, QtGui, QtCore
from PyQt5.QtWidgets import *
import sys

class Ui(QtWidgets.QMainWindow):
    def __init__(self):
        # Instantiating the main class
        super(Ui, self).__init__()

        # Loading the Graphical Design without
        # converting it to any kind of Python code
        uic.loadUi('basicAdv.ui', self)

        # Adding all the essential buttons
        self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
        self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

        self.clrBtn = self.findChild(QtWidgets.QPushButton, 'clrBtn')  # Find the button
        self.clrBtn.clicked.connect(self.clearButtonClick)  # Remember to pass the definition/method, not the return value!

        self.addBtn = self.findChild(QtWidgets.QPushButton, 'addBtn')  # Find the button
        self.addBtn.clicked.connect(self.addItem)  # Remember to pass the definition/method, not the return value!

        self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
        self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

        self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
        self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

        # Adding other static input/output elements
        self.input = self.findChild(QtWidgets.QLineEdit, 'input')
        self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
        self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
        self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')
        self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

        # Adding Combobox
        self.combo = self.findChild(QtWidgets.QComboBox, 'sComboBox')  # Find the ComboBox

        # Adding static element to it
        self.combo.addItem("Sourav Ganguly")
        self.combo.addItem("Kapil Dev")
        self.combo.addItem("Sunil Gavaskar")
        self.combo.addItem("M. S. Dhoni")

        # Click Event
        self.combo.activated[str].connect(self.onChanged)  # Remember to pass the definition/method, not the return value!

        # Adding list Box
        self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

        # Adding static element to it
        self.listwidget2.insertItem(0, "Aamir Khan")
        self.listwidget2.insertItem(1, "Shahruk Khan")
        self.listwidget2.insertItem(2, "Salman Khan")
        self.listwidget2.insertItem(3, "Hrittik Roshon")
        self.listwidget2.insertItem(4, "Amitabh Bachhan")

        # Click Event
        self.listwidget2.clicked.connect(self.showIndividualElement)

        # Adding Group Box
        self.groupBox = self.findChild(QtWidgets.QGroupBox, 'groupBox')  # Find the ComboBox
        self.groupBox.setCheckable(True)

        # Adding Individual Radio Button
        self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
        self.rdButton1.setChecked(True)
        self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

        self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
        self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

        self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
        self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

        self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
        self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

        self.show()

    def printRadioButtonClick(self, radioOption):

        if radioOption.text() == 'China':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'India':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'Japan':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'France':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

    def printButtonClick(self):
        # This is executed when the button is pressed
        print('Input text:' + self.input.text())

    def clearButtonClick(self):
        # This is executed when the button is pressed
        self.input.clear()

    def onChanged(self, text):
        self.qlabel.setText(text)
        self.qlabel.adjustSize()
        self.lineEdit.clear()  # Clear the text

    def addItem(self):
        value = self.lineEdit.text() # Get the value of the lineEdit
        self.lineEdit.clear() # Clear the text
        self.listWidget.addItem(value) # Add the value we got to the list

    def setImage(self):
        fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
        if fileName: # If the user gives a file
            pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
            pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
            self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
            self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

    def showDialog(self):
        msgBox = QMessageBox()
        msgBox.setIcon(QMessageBox.Information)
        msgBox.setText("Message box pop up window")
        msgBox.setWindowTitle("MessageBox Example")
        msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
        msgBox.buttonClicked.connect(self.msgButtonClick)

        returnValue = msgBox.exec()
        if returnValue == QMessageBox.Ok:
            print('OK clicked')

    def msgButtonClick(self, i):
        print("Button clicked is:", i.text())

    def showIndividualElement(self, qmodelindex):
        item = self.listwidget2.currentItem()
        print(item.text())

if __name__ == "__main__":

    import sys
    app = QtWidgets.QApplication(sys.argv)
    window = Ui()
    window.show()
    sys.exit(app.exec_())

Let us explore a few key lines from this script. Rests are almost identical.

# Loading the Graphical Design without
# converting it to any kind of Python code
uic.loadUi('basicAdv.ui', self)

Loading the GUI created using Qt Designer into the Python environment.

# Adding all the essential buttons
self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

In this case, we’re dynamically binding the component from the GUI by using the findChild method & then on the next line, we’re invoking the appropriate event associated with that. In this case, it is – self.printButtonClick.

The printButtonClick as mentioned earlier is a method & that contains the following snippet –

def printButtonClick(self):
    # This is executed when the button is pressed
    print('Input text:' + self.input.text())

As you can see, this event will capture the text from the input textbox & print it on our terminal.

Here is the snippet for those widgets, which is part of only input/output & they generally don’t have an event of their own. But, we need to bind them with our Python application.

# Adding other static input/output elements
self.input = self.findChild(QtWidgets.QLineEdit, 'input')
self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')

This application has drop-down list & hence, we’ve added some static value during our load of this application & that can be seen here –

# Adding list Box
self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

# Adding static element to it
self.listwidget2.insertItem(0, "Aamir Khan")
self.listwidget2.insertItem(1, "Shahruk Khan")
self.listwidget2.insertItem(2, "Salman Khan")
self.listwidget2.insertItem(3, "Hrittik Roshon")
self.listwidget2.insertItem(4, "Amitabh Bachhan")

Once, the user will select a specific value from this list, the app will execute the following event as shown below –

# Click Event
self.listwidget2.clicked.connect(self.showIndividualElement)

Again, to explore the method, you need to view the given logic –

def showIndividualElement(self, qmodelindex):
    item = self.listwidget2.currentItem()
    print(item.text())

Group Box, along with the radio button, works slightly different than our drop-down list.

For each radio button, we’ll have a dedicated text value that represents a different country in this context.

And, our application will bind all the radio button & then they will use one standard method for all of these four options as shown below –

# Adding Individual Radio Button
self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
self.rdButton1.setChecked(True)
self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

Also, note that, by default, rdButton1 is set to True i.e., it will be selected when the form load initially.

Let’s explore the printRadioButtonClick event.

def printRadioButtonClick(self, radioOption):

    if radioOption.text() == 'China':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'India':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'Japan':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'France':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

This will capture the radio button option & based on the currently clicked button, it will fetch the text out of it. Finally, that will match with the logic here & based on that, our application will display the output.

Finally, the Image process is slightly different.

Initially, our application will load the component from the .ui file & bind them with the Python environment –

self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

Image load option will only work when the user clicks the button that triggers the following sets of actions –

self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

Let’s explore the setImage method –

def setImage(self):
    fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
    if fileName: # If the user gives a file
        pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
        pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
        self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
        self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

This will prompt the corresponding dialogue box for choosing the right images out of the respective O/S.

Last but not least, the use of MsgBox, which can be extremely useful for many GUI based programming.

This msgbox doesn’t exist in the form. However, we’re creating it on the event of the “Confirm Button” as shown below –

self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

This will prompt the showDialog method to trigger –

def showDialog(self):
    msgBox = QMessageBox()
    msgBox.setIcon(QMessageBox.Information)
    msgBox.setText("Message box pop up window")
    msgBox.setWindowTitle("MessageBox Example")
    msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
    msgBox.buttonClicked.connect(self.msgButtonClick)

    returnValue = msgBox.exec()
    if returnValue == QMessageBox.Ok:
        print('OK clicked')

And, based on your options (“OK”/”Cancel”), it will prompt the final captured message in your console.

Let’s explore the videos of output from Windows O/S –

Let’s explore the video output from MAC VM –

For more information on this package – please check the following link.

So, as you can see, finally we’ve achieved it. We’ve demonstrated cross-platform GUI applications using native Python. And, here we didn’t even convert the ui design file to python script either.

Please share your feedback.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Creating a native mobile application using beeware framework using native Python

Hi Guys,

Today, I’ll explain a relatively new GUI framework with which you can create native mobile applications across all the different platforms. Note that this framework is still in the preview phase on many fronts. And you also can contribute here in many ways.

Let’s jump into making a chat application using this. It is relatively easy to build.

You need to install the following framework –

pip install beeware==0.3.0.dev3

As I’ve told you that this package is in the preview stage. So, you need to wait for few more days to get this package available for production use.

However, the following diagram presented in Pycon explains that all –

8. All Umbrella

Let’s jump into our objective.

To create the project, the following are steps that need to perform –

Functions that you need to use –

briefcase new

This will lead to several options that you need to fill as shown in the next couple of slides –

1. Project_Init_Win
2. Project_Init_Win

Note that, this one I’ve created in the Windows environment. So, you need to provide all these details before creating the app.

This will create a template of code as shown below –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
"""
Chat with my friends
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW


class SDApp(toga.App):

    def startup(self):
        """
        Construct and show the Toga application.

        Usually, you would add your application to a main content box.
        We then create a main window (with a name matching the app), and
        show the main window.
        """
        main_box = toga.Box()

        self.main_window = toga.MainWindow(title=self.formal_name)
        self.main_window.content = main_box
        self.main_window.show()


def main():
    return SDApp()

And, you can run this template to see the default template output by using the following command –

briefcase dev

3. Run_Basic_In_Windows

Now, I want to take this & add some lines of codes to create a chat-based application in MAC & see how it behaves.

But, before that, the directory structure will look like this –

9. Dir_Structure

As you can see, SDChat is my application name. And, based on that, the following directories.

And, inside the final SDChat directory, the following files are created –

__init__.py

__main__.py

app.py

Let’s take this & modify it for MAC. To do that, we need to change app.py script & layout our all the essential GUI ingredients.

Note that I’m not going to discuss the custom bot that I created. Only, I’ll be referring to it.

1. app.py (This script contains all the GUI details & it will invoke the custom bot.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 17-Feb-2020              ####
#### Modified On 17-Feb-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################
"""
This is a Chat Application with custom made bot
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW

# My Custom Built Bot
from SDChatbots import clsTalk2Bot as e

#-- New
class SDChat(toga.App):

    def startup(self):
        """
        Construct and show the Toga application.

        Usually, you would add your application to a main content box.
        We then create a main window (with a name matching the app), and
        show the main window.
        """

        self.chat = toga.DetailedList(data=[], style=Pack(flex=1))

        self.chat.data.append(icon = toga.Icon('resources/brutus.png'), title='SDChat', subtitle='Hi! How are you doing today?')

        self.text_input = toga.TextInput(style=Pack(flex=1))
        #send_button = toga.Button('Send')

        send_button = toga.Button(
            'Send',
            on_press=self.greet,
            style=Pack(padding_right=5)
        )

        input_box = toga.Box(
            children=[self.text_input, send_button],
            style=Pack(direction=ROW, alignment='center', padding=5)
        )

        main_box = toga.Box(children=[self.chat, input_box], style=Pack(direction=COLUMN))
        # main_box.add(send_button)

        self.main_window = toga.MainWindow(title=self.formal_name)

        self.main_window.content = main_box
        self.main_window.show()

    def greet(self, widget):
        print('Value: ', self.text_input.value)
        input_text = self.text_input.value

        self.chat.data.append(icon=toga.Icon('resources/user.png'), title='You', subtitle = input_text)

        # Chatbot
        y = e.clsTalk2Bot()
        ret_val = y.TalkNow(input_text)

        self.chat.data.append(icon=toga.Icon('resources/brutus.png'), title='SDChat', subtitle=ret_val)

        self.text_input.value = ''
        self.chat.scroll_to_bottom()


def main():
    return SDChat()

Let’s discuss a couple of essential lines from the above snippet –

self.chat = toga.DetailedList(data=[], style=Pack(flex=1))

self.chat.data.append(icon = toga.Icon('resources/brutus.png'), title='SDChat', subtitle='Hi! How are you doing today?')

This is the main display box, where you can see all the chat details. And, also, by default, it will prompt the initial conversion starter.

self.text_input = toga.TextInput(style=Pack(flex=1))

This is the place where you will be typing your text.

send_button = toga.Button(
    'Send',
    on_press=self.greet,
    style=Pack(padding_right=5)
)

Here is the button that you are creating. As you can see, almost all the places I’ve provided the “Style,” which is key to your object alignment inside your mobile app.

main_box = toga.Box(children=[self.chat, input_box], style=Pack(direction=COLUMN))

self.main_window = toga.MainWindow(title=self.formal_name)

self.main_window.content = main_box
self.main_window.show()

Finally, you need to bind all the components & you will show the page.

Now, let’s discuss the function that I’ve created for my button –

def greet(self, widget):
    input_text = self.text_input.value

    self.chat.data.append(icon=toga.Icon('resources/user.png'), title='You', subtitle = input_text)

    # Chatbot
    y = e.clsTalk2Bot()
    ret_val = y.TalkNow(input_text)

    self.chat.data.append(icon=toga.Icon('resources/brutus.png'), title='SDChat', subtitle=ret_val)

    self.text_input.value = ''
    self.chat.scroll_to_bottom()

As you can see, here, our application will capture the user input & based on that, our program will pass the input text to our chatbot. Also, you can see once that communication & response achieved, the input box will be cleared & the control will move down to the end of the chat screen. This is required. Otherwise, the user won’t be able to view the latest communication.

10. Run_MAC

So, as you can see that, this extremely easy to create & you can enhance it as per your need.

For more information on this framework, please go through the following link ->

https://pypi.org/project/beeware/0.3.0.dev3/

https://docs.beeware.org/en/latest/

https://beeware.org/project/about/

For more details, view the main page ->

https://beeware.org/

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Sending SMS using 3rd party API by integrating with custom-built BOT in Python

Hi Guys!

Today, We’re going to discuss the way to send SMS through popular 3rd-party API (Twilio) using Python 3.7.

Before that, you need to register with Twilio. By default, they will give you some credit in order to explore their API.

And, then you can get a virtual number from them, which will be used to exchange SMS between your trusted numbers for trial Account.

1. Booking Phone Number

The basic architecture can be depicted are as follows –

14. FeatureImage

How to get a verified number for your trial account?

Here is the way, you have to do that –

10. VerifiedNumbers

You can create your own trial account by using this link.

Apart from that, you need to download & install Ngrok. This is available for multi-platform. For our case, we’re using Windows.

The purpose is to run your local web service through a global API like interface. I’ll explain that later.

You need to register & install that on your computer –

2. Ngrok

Once, you download & install you need to use the global link of any running local server application like this –

3. GetURL

This is the dummy link. I’ll hide the original link. However, every time when you restart the application, you’ll get a new link. So, you will be safe anyway. 🙂

4. UpdateLink

Once, you get the link, you have to update that global link under the messaging section. Remember that, you have to keep the “/sms” part after that.

Let’s see our sample code. here, I would be integrating my custom developed BOT developed in Python. However, I’ll be only calling that library. We’re not going post any script or explain that over here.

1. serverSms.py ( This script is a server script, which is using flask framework & it will respond to the user’s text message by my custom developed BOT using Python)

# /usr/bin/env python
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 03-Nov-2019              ####
####                                      ####
#### Objective: This script will respond  ####
#### by BOT created by me. And, reply to  ####
#### sender about their queries.          ####
#### We're using Twillio API for this.    ####
####                                      ####
##############################################

from flask import Flask, request, redirect
from twilio import twiml
from twilio.twiml.messaging_response import Message, MessagingResponse
import logging
from flask import request
from SDChatbots.clsTalk2Bot import clsTalk2Bot

app = Flask(__name__)

@app.route("/sms", methods=['GET', 'POST'])
def sms_ahoy_reply():
    """Respond to incoming messages with a friendly SMS."""
    # Start our response
    # resp = twiml.Response()
    message_body = request.form['Body']

    print(message_body)
    logging.info(message_body)

    y = clsTalk2Bot()
    ret_val = y.TalkNow(message_body)
    zMsg = str(ret_val)
    print('Response: ', str(zMsg))

    resp = MessagingResponse()

    # Add a message
    resp.message(zMsg)

    return str(resp)

if __name__ == "__main__":
    app.run(debug=True)

Key lines from the above scripts are –

@app.route("/sms", methods=['GET', 'POST'])

The route is a way to let your application understand to trigger the appropriate functionalities inside your API.

message_body = request.form['Body']

Here, the application is capturing the incoming SMS & print that in your server log. We’ll see that when we run our application.

y = clsTalk2Bot()
ret_val = y.TalkNow(message_body)
zMsg = str(ret_val)

Now, the application is calling my developed python BOT & retrieve the response & convert it as a string before pushing the response SMS to the user, who originally send the SMS.

resp = MessagingResponse() --This is for Python 3.7 +

# Add a message
resp.message(zMsg)

return str(resp)

Finally, you are preparing the return SMS & send it back to the user.

For the old version, the following line might work –

resp = twiml.Response()

But, just check with the Twilio API.

Let’s run our server application. You will see the following screen –

11. ServerResponse

Let’s see, if one someone ask some question. How the application will respond –

7.1. BotIntegratedSMS

And, let’s explore how our server application is receiving it & the response from the server –

6. ServerResponse

Note that, we’ll be only sending the text to SMS, not the statistics sent by my BOT marked in RED.  😀

Let’s check the response from the BOT –

7.2. BotIntegratedSMS

Yes! We did it. 😀

But, make sure you are regularly checking your billing as this will cost you money. Always, check the current balance –

9. BillingInfo

You can check the usage from the following tab –

12. Usage

You can create a billing alarm to monitor your usage –

13. BillingAlert

Let me know, how do you like it.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Converting text to voice in Python

Hi Guys!

Today, we’ll be discussing one new post of converting text into a voice using some third-party APIs. This is particularly very useful in many such cases, where you can use this method to get more realistic communication.

There are many such providers, where you can get an almost realistic voice for both males & females. However, most of them are subscription-based. So, you have to be very careful about your budget & how to proceed.

For testing purposes, I’ll be using voice.org to simulate this.

Let’s look out the architecture of this process –

FlowS

As you can see, the user-initiated the application & provide some input in the form of plain text. Once the data is given, the app will send it to the third-party API for the process. Now, the Third-party API will verify the authentication & then it will check all the associate parameters before it starting to generate the audio response. After that, it will send the payload & that will be received by the calling python application. Here, it will be decoded & create the audio file & finally, that will be played at the invoking computer.

This third-party API has lots of limitations. However, they are giving you the platform to test your concept.

As of now, they support the following languages – English, Chinese, Catalan, French, Finnish, Dutch, Danish, German, Italian, Japanese, Korean, Polish, Norwegian, Portuguese, Russian, Spanish & Sweedish.

In our case, we’ll be checking with English.

To work with this, you need to have the following modules installed in python –

  • playsound
  • requests
  • base64

Let’s see the directory structure –

1. Directory

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'url': "https://voicerss-text-to-speech.p.rapidapi.com/",
        'host': "voicerss-text-to-speech.p.rapidapi.com",
        'api_key': "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
        'targetFile': "Bot_decode.mp3",
        'pitch_speed': "-6",
        'bot_language': "en-us",
        'audio_type': "mp3",
        'audio_freq': "22khz_8bit_stereo",
        'query_string_api': "hhhhhhhhhhhhhhhhhhhhhhhhhhhh",
        'b64_encoding': True,
        'APP_DESC_1': 'Text to voice conversion.',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'LOG_PATH': Curr_Path + sep + 'log' + sep
    }

For security reasons, sensitive information masked with the dummy value.

‘api_key’: “xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx”,

‘query_string_api’: “hhhhhhhhhhhhhhhhhhhhhhhhhhhh”,

This two information is private to each subscriber. Hence, I’ve removed them & updated with some dummy values.

You have to fill-up with your subscribed information.

2. clsText2Voice.py (This script will convert the text data into an audio file using a GET API request from the third-party API & then play that using the web media player.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 27-Oct-2019               ####
#### Modified On 27-Oct-2019               ####
####                                       ####
#### Objective: Main class converting      ####
#### text to voice using third-party API.  ####
###############################################

from playsound import playsound
import requests
import base64
from clsConfig import clsConfig as cf

class clsText2Voice:
    def __init__(self):
        self.url = cf.config['url']
        self.api_key = cf.config['api_key']
        self.targetFile = cf.config['targetFile']
        self.pitch_speed = cf.config['pitch_speed']
        self.bot_language = cf.config['bot_language']
        self.audio_type = cf.config['audio_type']
        self.audio_freq = cf.config['audio_freq']
        self.b64_encoding = cf.config['b64_encoding']
        self.query_string_api = cf.config['query_string_api']
        self.host = cf.config['host']

    def getAudio(self, srcString):
        try:
            url = self.url
            api_key = self.api_key
            tarFile = self.targetFile
            pitch_speed = self.pitch_speed
            bot_language = self.bot_language
            audio_type = self.audio_type
            audio_freq = self.audio_freq
            b64_encoding = self.b64_encoding
            query_string_api = self.query_string_api
            host = self.host

            querystring = {
                "r": pitch_speed,
                "c": audio_type,
                "f": audio_freq,
                "src": srcString,
                "hl": bot_language,
                "key": query_string_api,
                "b64": b64_encoding
            }

            headers = {
                'x-rapidapi-host': host,
                'x-rapidapi-key': api_key
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            # Converting to MP3
            targetFile = tarFile
            mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
            mp3File_result = open(targetFile, 'wb')

            # create a writable mp3File and write the decoding result
            mp3File_result.write(mp3File_64_decode)
            mp3File_result.close()

            playsound(targetFile)

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

Few crucial lines from the above script –

querystring = {
    "r": pitch_speed,
    "c": audio_type,
    "f": audio_freq,
    "src": srcString,
    "hl": bot_language,
    "key": query_string_api,
    "b64": b64_encoding
}

You can configure the voice of the audio by adjusting all the configurations. And, the text content will receive at srcString. So, whatever user will be typing that will be directly captured here & form the JSON payload accordingly.

response = requests.request("GET", url, headers=headers, params=querystring)

In this case, you will be receiving the audio file in the form of a base64 text file. Hence, you need to convert them back to the sound file by these following lines –

# Converting to MP3
targetFile = tarFile
mp3File_64_decode = base64.decodebytes(bytes(response.text, encoding="utf-8"))
mp3File_result = open(targetFile, 'wb')

# create a writable mp3File and write the decoding result
mp3File_result.write(mp3File_64_decode)
mp3File_result.close()

As you can see that, we’ve extracted the response.text & then we’ve decoded that to byte object to form the mp3 sound file at the receiving end.

Once we have our mp3 file ready, the following line simply plays the audio record.

playsound(targetFile)

Thus you can hear the actual voice.

3. callText2Voice.py (This is the main script that will invoke the text to voice API & then playback the audio once it gets the response from the third-party API.)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 27-Oct-2019               ####
#### Modified On 27-Oct-2019               ####
####                                       ####
#### Objective: Main class converting      ####
#### text to voice using third-party API.  ####
###############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsText2Voice as ct

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedTwitter.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = str(input('Enter your string:'))

        x1 = ct.clsText2Voice()
        ret_2 = x1.getAudio(rawQry)

        if ret_2 == 0:
            print("Successfully converted from text to voice!")
            logging.info("Successfully converted from text to voice!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly converted!")
            logging.info("Successfuly converted!")
            print("*" * 157)
            logging.info(tmpR0)

        print("*" * 157)
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Essential lines from the above script –

# Query using parameters
rawQry = str(input('Enter your string:'))

x1 = ct.clsText2Voice()
ret_2 = x1.getAudio(rawQry)

As you can see, here the user will be passing the text content, which will be given to our class & then it will project the audio sound of that text.

Let’s see how it runs –

Input Text: Welcome to Satyaki De’s blog. This site mainly deals with the Python, SQL from different DBs & many useful areas from the leading cloud providers.

And, here is the run command under Windows OS looks like –

2. Windows_Run

And, please find the sample voice that it generates –

So, We’ve done it! 😀

Let us know your comment on this.

So, we’ll come out with another exciting post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Explaining New Python Library – Regular Expression in JSON

Hi Guys!

As discussed, here is the continuation of the previous post. We’ll explain the regular expression from the library that I’ve created recently.

First, let me share the calling script for regular expression –

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 08-Sep-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from dnpr.clsDnpr import clsDnpr
import datetime as dt
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

def main():
    try:
        # Initializing the class
        t = clsDnpr()
        
        srcJson = [
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
                    {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
                    {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
                  ]

        print("4. Checking regular expression functionality!")
        print()

        var13 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var13))

        print('::Function Regex_Like:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Like: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Like!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var14 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var14))

        var15 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var15))

        print('::Function Regex_Replace:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Rexex_Replace: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))
        replaceString = 'Ka'
        print('Replacing Character: ', replaceString)

        # Invoking the distinct function
        tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

        print('End of Function Rexex_Replace!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var16 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var16))

        var17 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("Start Time: ", str(var17))

        print('::Function Regex_Substr:: ')
        print()

        tarColumn = 'FirstName'
        print('Target Column for Regex_Substr: ', tarColumn)
        inpPattern = r"\bSa"
        print('Input Pattern: ', str(inpPattern))

        # Invoking the distinct function
        tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

        print('End of Function Regex_Substr!')
        print()

        print("*" * 157)
        print("Output Data: ")
        tarJsonFormat = json.dumps(tarJson, indent=1)
        print(str(tarJsonFormat))
        print("*" * 157)

        if not tarJson:
            print()
            print("No relevant output data!")
            print("*" * 157)
        else:
            print()
            print("Relevant output data comes!")
            print("*" * 157)

        var18 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
        print("End Time: ", str(var18))

        print("=" * 157)
        print("End of regular expression function!")
        print("=" * 157)



    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

As per the library, we’ll discuss the following functionalities –

  1. regex_like
  2. regex_replace
  3. regex_substr

Now, let us check how to call these functions.

1. regex_like:

Following is the base skeleton in order to invoke this function –

regex_like(Input Json, Target Column, Pattern To Match) return Output Json

Here are the key lines in the script –

srcJson = [
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
            {"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
            {"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
          ]

# Invoking the distinct function
tarJson = t.regex_like(srcJson, tarColumn, inpPattern)

2. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_replace(Input Json, Target Column, Pattern to Replace) return Output Json

Here are the key lines in the script –

tarColumn = 'FirstName'
print('Target Column for Rexex_Replace: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
replaceString = 'Ka'
print('Replacing Character: ', replaceString)

# Invoking the distinct function
tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)

As you can see, here ‘Sa’ with ‘Ka’ provided it matches the specific pattern in the JSON.

3. regex_replace:

Following is the base skeleton in order to invoke this function –

regex_substr(Input Json, Target Column, Pattern to substring) return Output Json

Here are the key lines –

tarColumn = 'FirstName'
print('Target Column for Regex_Substr: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))

# Invoking the distinct function
tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)

In this case, we’ve subtracted a part of the JSON string & return the final result as JSON.

Let us first see the sample input JSON –

SourceJSON_Regex

Let us check how it looks when we run the calling script –

  • regex_like:
Regex_Like

This function will retrieve the elements, which will start with ‘Sa‘. As a result, we’ll see the following two elements in the Payload.

  • regex_replace:
Regex_Replace

In this case, we’re replacing any string which starts with ‘Sa‘ & replaced with the ‘Ka‘.

  • regex_substr:
Regex_Substr

As you can see that the first element FirstName changed the name from “Satyaki” to “tyaki“.

So, finally, we’ve achieved our target.

I’ll post the next exciting concept very soon.

Till then! Happy Avenging! 😀

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet.

Building Pipeline into the Azure Data Factory!

Hi Guys!

After a brief gap, here is the latest edition of data pipeline implementation through Azure Data Factory.

Our objective is straightforward. In this post, we’ll develop a major ADF pipeline. However, our main aim in coming days to use the same pipe & enhance them in the coming days.

So, let’s get started!

Step -1:

The first step to view the sample data.

0. Sample Data

So, now, we’ll be loading this file in one of our already created SQL DB inside our cloud environment.

Step -2:

Now, we will upload this sample file to blobstore, which our application will use as a source in the flow.

1. Source File

Step -3:

Let’s create & set-up the initial data factory.

2. Data Factory Initial Screen

Now, clicking the create data factory, we’ll set-up the first time environment.

3. Initial SetUp

By clicking the “create,” the cloud will prepare the environment for the first time.

4. Basic Screen

For security reason, I’m not displaying other essential options.

Now, you need to click Author & Monitor to finally arrive at the main development interface, which will look like this –

5. In-Progress
6. Initial Screen

Now, you need to click “Create Pipeline.”

7. Adding Dataset

Once, you are on this page. You are ready to build your first data flow.

Step -4:

Let’s make the source data ready. To do that, we’ll be the first click the “Add Dataset” in this case. And, follow the given steps provided in the series of snapshot given below –

8. Selecting Data Source

In this case, we would choose Azure Blob Storage as our source place.

10. Selected Type

Now, we have to choose the source file type.

Finally, you need to provide other essential information as shown in the next screenshot & you need to test it.

11. Testing Source Data Connection

Now, you need to choose the source path & need to preview the data. You can only view the data once you can successfully connect.

12. Choosing Path - 1
13. Choosing Path -2
14. Choosing Path -3

So, if everything looks good. Then you can see the following page –

15. Preview

Once, you click it. You will be able to view it.

16. Preview - 2

Step -5:

Now, we’ll be using the copy data option as shown below –

17. Using Copy Data Option

Now, we’ll be configuring the key places –

18. Selecting Source

As you can see, you can choose the csv file as source.

You can then generate the mapping. By clicking the Mapping tab, you can view the detail.

19. Import Schemas

Now, we’ll be preparing the dataflow.

20. Adding DataFlow

So, basically, here we’ll be creating the flow as per our requirement. In our case, we’ll be using one primary filter before we push our data to the target.

And, the steps are as follows –

As from the above picture, we have configured the source by choosing the appropriate source data selection.

21. Dataflow Debug Training

You have to turn-on debug preview mode. But, remember one crucial point. For that ADF will create one runtime cluster & for that you will be charged separately. However, you can view while building the data pipeline.

22. Dataflow Preview

Finally, we’ll be selecting the target.

23. Sink Or Target

We’ll be dragging the sink/target. And, then configure that in this following steps –

24. Choosing Target
25. TargetConnectionTest
26. Creating A new table

So, In this case, It will create a new table.

Once, you prepare everything, you have to validate the basic flow.

28. Validate Floew

Finally, we’ll create the trigger.

31. triggers - 1

You need to click the trigger at the bottom. You need to mention the trigger timing. Just for testing purpose, we’ll mention one-time execution only.

32. Trtigger Timing

Once, you click the finish button. The next page will look like me.

33. Setting Up Triggers

Here is the Action button. Once, you click the play/run button – you would trigger the task/flow & this would look like this.

34. Running Triggers

So, we’re done for the day.

Let me know, what do you think?

Till then! Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet.

Building an Azure Function using Python (Crossover between Reality Stone & Time Stone in Python Verse)

Hi Guys!

Today, we’ll be discussing a preview features from Microsoft Azure. Building an Azure function using Python on it’s Linux/Ubuntu VM. Since this is a preview feature, we cannot implement this to production till now. However, my example definitely has more detailed steps & complete code guide compared to whatever available over the internet.

In this post, I will take one of my old posts & enhance it as per this post. Hence, I’ll post those modified scripts. However, I won’t discuss the logic in details as most of these scripts have cosmetic changes to cater to this requirement.

In this post, we’ll only show Ubuntu run & there won’t be Windows or MAC comparison.

Initial Environment Preparation:

  1. Set-up new virtual machine on Azure.
  2. Set-up Azure function environments on that server.

Set-up new virtual machine on Azure:

I’m not going into the details of how to create Ubuntu VM on Microsoft Azure. You can refer the steps in more information here.

After successful creation, the VM will look like this –

Azure VM - Ubuntu

Detailed information you can get after clicking this hyperlink over the name of the VM.

Azure-VM Basic Details

You have to open port 7071 for application testing from the local using postman.

You can get it from the network option under VM as follows –

Network-Configuration

Make sure that you are restricting these ports to specific network & not open to ALL traffic.

So, your VM is ready now.

To update Azure CLI, you need to use the following commands –

sudo apt-get update && sudo apt-get install –only-upgrade -y azure-cli

Set-up Azure function environments on that server:

To set-up the environment, you don’t have to go for Python installation as by default Ubuntu in Microsoft Azure comes up with desired Python version, i.e., Python3.6. However, to run the python application, you need to install the following app –

  1. Microsoft SDK. You will get the details from this link.
  2. Installing node-js. You will get the details from this link.
  3. You need to install a docker. However, as per Microsoft official version, this is not required. But, you can create a Docker container to distribute the python function in Azure application. I would say you can install this just in case if you want to continue with this approach. You will get the details over here. If you want to know details about the Docker. And, how you want to integrate python application. You can refer to this link.
  4. Your desired python packages. In this case, we’ll be modifying this post – “Encryption/Decryption, JSON, API, Flask Framework in Python (Crossover between Reality Stone & Time Stone in Python Verse).” We’ll be modifying a couple of lines only to cater to this functionality & deploying the same as an Azure function.
  5. Creating an Azure function template on Ubuntu. The essential detail you’ll get it from here. However, over there, it was not shown in detailed steps of python packages & how you can add all the dependencies to publish it in details. It was an excellent post to start-up your knowledge.

Let’s see these components status & very brief details –

Microsoft SDK:

To check the dot net version. You need to type the following commands in Ubuntu –

dotnet –info

And, the output will look like this –

DotNet-Version

Node-Js:

Following is the way to verify your node-js version & details –

node -v

npm -v

And, the output looks like this –

Node-Js

Docker:

Following is the way to test your docker version –

docker -v

And, the output will look like this –

Docker-Version

Python Packages:

Following are the python packages that we need to run & publish that in Azure cloud as an Azure function –

pip freeze | grep -v “pkg-resources” > requirements.txt

And, the output is –

Requirements

You must be wondered that why have I used this grep commands here. I’ve witnessed that on many occassion in Microsoft Azure’s Linux VM it produces one broken package called resource=0.0.0, which will terminate the deployment process. Hence, this is very crucial to eliminate those broken packages.

Now, we’re ready for our python scripts. But, before that, let’s see the directory structure over here –

Win_Vs_Ubuntu-Cloud

Creating an Azure Function Template on Ubuntu: 

Before we post our python scripts, we’ll create these following components, which is essential for our Python-based Azure function –

  • Creating a group:

              Creating a group either through Azure CLI or using a docker, you can proceed. The commands for Azure CLI is as follows –

az group create –name “rndWestUSGrp” –location westus

It is advisable to use double quotes for parameters value. Otherwise, you might land-up getting the following error – “Error: “resourceGroupName” should satisfy the constraint – “Pattern”: /^[-w._]+$/“.

I’m sure. You don’t want to face that again. And, here is the output –

CreateDeploymentGroup

Note that, here I haven’t used the double-quotes. But, to avoid any unforeseen issues – you should use double-quotes. You can refer the docker command from the above link, which I’ve shared earlier.

Now, you need to create one storage account where the metadata information of your function will be stored. You will create that as follows –

az storage account create –name cryptpy2019 –location westus –resource-group rndWestUSGrp –sku Standard_LRS

And, the output will look like this –

AccountCreate_1

Great. Now, we’ll create a virtual environment for Python3.6.

python3.6 -m venv .env
source .env/bin/activate

Python-VM

Now, we’ll create a local function project.

func init encPro

And, the output you will get is as follows –

Local-Function

Inside this directory, you’ll see the following files –

Local-Function-Details

You need to edit the host.json with these default lines –

{
 “version”: “2.0”,
 “extensionBundle”: {
                                       “id”: “Microsoft.Azure.Functions.ExtensionBundle”,
                                       “version”: “[1.*, 2.0.0)”
                                     }
}

And, the final content of these two files (excluding the requirements.txt) will look like this –

Configuration

Finally, we’ll create the template function by this following command –

func new

This will follow with steps finish it. You need to choose Python as your programing language. You need to choose an HTTP trigger template. Once you created that successfully, you’ll see the following files –

func_New

Note that, our initial function name is -> getVal.

By default, Azure will generate some default code inside the __init__.py. The details of those two files can be found here.

Since we’re ready with our environment setup. We can now discuss our Python scripts –

1. clsConfigServer.py (This script contains all the parameters of the server.)

###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 10-Feb-2019       ########
####                               ########
#### Objective: Parameter File     ########
###########################################

import os
import platform as pl

# Checking with O/S system
os_det = pl.system()

class clsConfigServer(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    if os_det == "Windows":
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '\\' + 'src_file\\',
            'PROFILE_FILE_PATH': Curr_Path + '\\' + 'profile\\',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }
    else:
        config = {
            'FILE': 'acct_addr_20180112.csv',
            'SRC_FILE_PATH': Curr_Path + '/' + 'src_file/',
            'PROFILE_FILE_PATH': Curr_Path + '/' + 'profile/',
            'HOST_IP_ADDR': '0.0.0.0',
            'DEF_SALT': 'iooquzKtqLwUwXG3rModqj_fIl409vemWg9PekcKh2o=',
            'ACCT_NBR_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1vemWg9PekcKh2o=',
            'NAME_SALT': 'iooquzKtqLwUwXG3rModqj_fIlpp1026Wg9PekcKh2o=',
            'PHONE_SALT': 'iooquzKtqLwUwXG3rMM0F5_fIlpp1026Wg9PekcKh2o=',
            'EMAIL_SALT': 'iooquzKtqLwU0653rMM0F5_fIlpp1026Wg9PekcKh2o='
        }

2. clsEnDec.py (This script is a lighter version of encryption & decryption of our previously discussed scenario. Hence, we won’t discuss in details. You can refer my earlier post to understand the logic of this script.)

###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 25-Jan-2019       ########
#### Package Cryptography needs to ########
#### install in order to run this  ########
#### script.                       ########
####                               ########
#### Objective: This script will   ########
#### encrypt/decrypt based on the  ########
#### hidden supplied salt value.   ########
###########################################

from cryptography.fernet import Fernet
import logging

from getVal.clsConfigServer import clsConfigServer as csf

class clsEnDec(object):

    def __init__(self):
        # Calculating Key
        self.token = str(csf.config['DEF_SALT'])

    def encrypt_str(self, data, token):
        try:
            # Capturing the Salt Information
            t1 = self.token
            t2 = token

            if t2 == '':
                salt = t1
            else:
                salt = t2

            logging.info("Encrypting the value!")

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            encr_val = str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            strV1 = "Encrypted value:: " + str(encr_val)
            logging.info(strV1)

            return encr_val

        except Exception as e:
            x = str(e)
            print(x)
            encr_val = ''

            return encr_val

    def decrypt_str(self, data, token):
        try:
            # Capturing the Salt Information
            t1 = self.token
            t2 = token

            if t2 == '':
                salt = t1
            else:
                salt = t2

            logging.info("Decrypting the value!")

            # Checking Individual Types inside the Dataframe
            cipher = Fernet(salt)
            decr_val = str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")

            strV2 = "Decrypted value:: " + str(decr_val)
            logging.info(strV2)

            return decr_val

        except Exception as e:
            x = str(e)
            print(x)
            decr_val = ''

            return decr_val

3. clsFlask.py (This is the main server script that will the encrypt/decrypt class from our previous scenario. This script will capture the requested JSON from the client, who posted from the clients like another python script or third-party tools like Postman.)

###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 25-Jan-2019           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: This script will       ####
#### encrypt/decrypt based on the      ####
#### supplied salt value. Also,        ####
#### this will capture the individual  ####
#### element & stored them into JSON   ####
#### variables using flask framework.  ####
###########################################

from getVal.clsConfigServer import clsConfigServer as csf
from getVal.clsEnDec import clsEnDecAuth

getVal = clsEnDec()

import logging

class clsFlask(object):
    def __init__(self):
        self.xtoken = str(csf.config['DEF_SALT'])

    def getEncryptProcess(self, dGroup, input_data, dTemplate):
        try:
            # It is sending default salt value
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will encrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.encrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.encrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.encrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.encrypt_str(input_data, xtoken)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid json Error Response
            return ret_val

    def getDecryptProcess(self, dGroup, input_data, dTemplate):
        try:
            xtoken = self.xtoken

            # Capturing the individual element
            dGroup = dGroup
            input_data = input_data
            dTemplate = dTemplate

            # This will check the mandatory json elements
            if ((dGroup != '') & (dTemplate != '')):

                # Based on the Group & Element it will fetch the salt
                # Based on the specific salt it will decrypt the data
                if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
                    xtoken = str(csf.config['ACCT_NBR_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.decrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
                    xtoken = str(csf.config['NAME_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.decrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
                    xtoken = str(csf.config['PHONE_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.decrypt_str(input_data, xtoken)
                elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
                    xtoken = str(csf.config['EMAIL_SALT'])

                    strV1 = "xtoken: " + str(xtoken)
                    logging.info(strV1)
                    strV2 = "Flask Input Data: " + str(input_data)
                    logging.info(strV2)

                    #x = cen.clsEnDecAuth()
                    ret_val = getVal.decrypt_str(input_data, xtoken)
                else:
                    ret_val = ''
            else:
                ret_val = ''

            # Return value
            return ret_val

        except Exception as e:
            ret_val = ''
            # Return the valid Error Response
            return ret_val

4. __init__.py (This autogenerated script contains the primary calling methods of encryption & decryption based on the element header & values after enhanced as per the functionality.)

###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 08-Jun-2019           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: Main Calling scripts.  ####
#### This is an autogenrate scripts.   ####
#### However, to meet the functionality####
#### we've enhanced as per our logic.  ####
###########################################
__all__ = ['clsFlask']

import logging
import azure.functions as func
import json

from getVal.clsFlask import clsFlask

getVal = clsFlask()

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python Encryption function processed a request.')

    str_val = 'Input Payload:: ' + str(req.get_json())
    str_1 = str(req.get_json())

    logging.info(str_val)

    ret_val = {}
    DataIn = ''
    dGroup = ''
    dTemplate = ''
    flg = ''

    if (str_1 != ''):
        try:
            req_body = req.get_json()
            dGroup = req_body.get('dataGroup')

            try:
                DataIn = req_body.get('data')
                strV15 = 'If Part:: ' + str(DataIn)

                logging.info(strV15)

                if ((DataIn == '') | (DataIn == None)):
                    raise ValueError

                flg = 'Y'
            except ValueError:
                DataIn = req_body.get('edata')
                strV15 = 'Else Part:: ' + str(DataIn)
                logging.info(strV15)
                flg = 'N'
            except:
                DataIn = req_body.get('edata')
                strV15 = 'Else Part:: ' + str(DataIn)
                logging.info(strV15)
                flg = 'N'

            dTemplate = req_body.get('dataTemplate')

        except ValueError:
            pass

    strV5 = "Encrypt Decrypt Flag:: " + flg
    logging.info(strV5)

    if (flg == 'Y'):

        if ((DataIn != '') & ((dGroup != '') & (dTemplate != ''))):

            logging.info("Encryption Started!")
            ret_val = getVal.getEncryptProcess(dGroup, DataIn, dTemplate)
            strVal2 = 'Return Payload:: ' + str(ret_val)
            logging.info(strVal2)

            xval = json.dumps(ret_val)

            return func.HttpResponse(xval)
        else:
            return func.HttpResponse(
                 "Please pass a data in the request body",
                 status_code=400
            )
    else:

        if ((DataIn != '') & ((dGroup != '') & (dTemplate != ''))):

            logging.info("Decryption Started!")
            ret_val2 = getVal.getDecryptProcess(dGroup, DataIn, dTemplate)
            strVal3 = 'Return Payload:: ' + str(ret_val)
            logging.info(strVal3)

            xval1 = json.dumps(ret_val2)

            return func.HttpResponse(xval1)
        else:
            return func.HttpResponse(
                "Please pass a data in the request body",
                status_code=400
            )

In this script, based on the value of an flg variable, we’re calling our encryption or decryption methods. And, the value of the flg variable is set based on the following logic –

try:
    DataIn = req_body.get('data')
    strV15 = 'If Part:: ' + str(DataIn)

    logging.info(strV15)

    if ((DataIn == '') | (DataIn == None)):
        raise ValueError

    flg = 'Y'
except ValueError:
    DataIn = req_body.get('edata')
    strV15 = 'Else Part:: ' + str(DataIn)
    logging.info(strV15)
    flg = 'N'
except:
    DataIn = req_body.get('edata')
    strV15 = 'Else Part:: ' + str(DataIn)
    logging.info(strV15)
    flg = 'N'

So, if the application gets the “data” element then – it will consider the data needs to be encrypted; otherwise, it will go for decryption. And, based on that – it is setting the value.

Now, we’re ready to locally run our application –

func host start

And, the output will look like this –

StartingAzureFunction-Python
StartingAzureFunction-Python 2

Let’s test it from postman –

Encrypt:

Postman-Encrypt

Decrypt:

Postman-Decrypt

Great. Now, we’re ready to publish this application to Azure cloud.

As in our earlier steps, we’ve already built our storage account for the metadata. Please scroll to top to view that again. Now, using that information, we’ll make the function app with a more meaningful name –

az functionapp create –resource-group rndWestUSGrp –os-type Linux \
–consumption-plan-location westus –runtime python \
–name getEncryptDecrypt –storage-account cryptpy2019

CreatingFunctionPython

Let’s publish the function –

sudo func azure functionapp publish “getEncryptDecrypt” –build-native-deps

On many occassion, without the use of “–build-native-deps” might leads to failure. Hence, I’ve added that to avoid such scenarios.

Publishing-Function

Now, we need to test our first published complex Azure function with Python through postman –

Encrypt:

PubishedFuncPostmanEncrypt

Decrypt:

PubishedFuncPostmanDecrypt

Wonderful! So, it is working.

You can see the function under the Azure portal –

Deployed-Function

Let’s see some other important features of this function –

Monitor: You can monitor two ways. One is by clicking the monitor options you will get the individual requests level details & also get to see the log information over here –

Function-Monitor-Details-1

Clicking Application Insights will give you another level of detailed logs, which can be very useful for debugging. We’ll touch this at the end of this post with a very brief discussion.

Function-Monitor-Details-3.JPG

As you can see, clicking individual lines will show the details further.

Let’s quickly check the application insights –

Application-Insights-1

Application Insights will give you a SQL like an interface where you can get the log details of all your requests.

Application-Insights-2

You can expand the individual details for further information.

Application-Insights-3

You can change the parameter name & other details & click the run button to get all the log details for your debugging purpose.

So, finally, we’ve achieved our goal. This is relatively long posts. But, I’m sure this will help you to create your first python-based function on the Azure platform.

Hope, you will like this approach. Let me know your comment on the same.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet.

Combining the NoSQL(Cosmos DB) & traditional Azure RDBMS in Azure (Time stone solo from Python verse)

Hi Guys!

Today, our main objective is to extend our last post & blending two different kinds of data using Python.

Please refer the earlier post if you didn’t go through it – “Building Azure cosmos application.“.

What is the Objective?

In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.

Examining Source Data.

No SQL Data from Cosmos:

Let’s check one more time the No SQL data created in our last post.

CosmosData

Total, we’ve created 6 records in our last post.

As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.

Azure SQL DB:

Let’s create some data in Azure SQL DB.

But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.

I won’t discuss the detailed steps of creating DB here.

From Azure portal, it looks like –

Azure SQL DB Main Screen

Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.

Let’s create the table & some sample data aligned as per our cosmos data.

Azure SQL DB

We will join both the data based on subscriberId & then extract our required columns in our final output.

CombinedData

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py
  • clsColMgmt.py
  • clsCosmosDBDet.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC

Here is the detailed directory structure between the Windows & MAC O/S.

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Updated On: 02-Jun-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'SERVER': 'xxxx-xxx.database.windows.net',
        'DATABASE_1': 'SalesForceMaster',
        'DATABASE_2': 'hrMaster',
        'DATABASE_3': 'statMaster',
        'USERNAME': 'admin_poc_dev',
        'PASSWORD': 'xxxxx',
        'DRIVER': '{ODBC Driver 17 for SQL Server}',
        'ENV': 'pocdev-saty',
        'ENCRYPT_FLAG': "yes",
        'TRUST_FLAG': "no",
        'TIMEOUT_LIMIT': "30",
        'PROCSTAT': "'Y'",
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'CONFIG_TABLE': 'ETL_CONFIG_TAB',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'AZURE_SQL_1': "SELECT DISTINCT subscriberId, state, country, annualIncome, customerType FROM dbo.onboardCustomer",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

Here, we’ve added a couple of more entries compared to the last time, which points the detailed configuration for Azure SQL DB.

‘SERVER’: ‘xxxx-xxx.database.windows.net’,
‘DATABASE_1’: ‘SalesForceMaster’,
‘DATABASE_2’: ‘hrMaster’,
‘DATABASE_3’: ‘statMaster’,
‘USERNAME’: ‘admin_poc_dev’,
‘PASSWORD’: ‘xxxxx’,
‘DRIVER’: ‘{ODBC Driver 17 for SQL Server}’,
‘ENV’: ‘pocdev-saty’,
‘ENCRYPT_FLAG’: “yes”,
‘TRUST_FLAG’: “no”,
‘TIMEOUT_LIMIT’: “30”,
‘PROCSTAT’: “‘Y'”, 

Here, you need to supply your DB credentials accordingly.

2. clsDBLookup.py (This script will look into the Azure SQL DB & fetch data from the traditional RDBMS of Azure environment.)

#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 25-May-2019                     ####
####                                             ####
#### Objective: This script will check &         ####
#### test the connection with the Azure          ####
#### SQL DB & it will fetch all the records      ####
#### name resied under the same DB of a table.   ####
#####################################################

import pyodbc as py
import pandas as p
from clsConfig import clsConfig as cdc

class clsDBLookup(object):
    def __init__(self, lkpTableName = ''):
        self.server = cdc.config['SERVER']
        self.database = cdc.config['DATABASE_1']
        self.database1 = cdc.config['DATABASE_2']
        self.database2 = cdc.config['DATABASE_3']
        self.username = cdc.config['USERNAME']
        self.password = cdc.config['PASSWORD']
        self.driver = cdc.config['DRIVER']
        self.env = cdc.config['ENV']
        self.encrypt_flg = cdc.config['ENCRYPT_FLAG']
        self.trust_flg = cdc.config['TRUST_FLAG']
        self.timeout_limit = cdc.config['TIMEOUT_LIMIT']
        self.lkpTableName = cdc.config['CONFIG_TABLE']
        self.ProcStat = cdc.config['PROCSTAT']
        self.AppId = cdc.config['APP_ID']

    def LookUpData(self):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            db_con_azure = py.connect(str_conn)

            query = " SELECT [ruleId] as ruleId, [ruleName] as ruleName, [ruleSQL] as ruleSQL, " \
                    " [ruleFlag] as ruleFlag, [appId] as appId, [DBType] as DBType, " \
                    " [DBName] as DBName FROM [dbo][" + lkpTableName + "] WHERE ruleFLag = " + ProcStat + " " \
                    " and appId = " + AppId + " ORDER BY ruleId "

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

    def azure_sqldb_read(self, sql):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            # print("Connection Details:: ", str_conn)
            db_con_azure = py.connect(str_conn)

            query = sql

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

Major lines to discuss –

azure_sqldb_read(self, sql):

Getting the source SQL supplied from the configuration script.

db_con_azure = py.connect(str_conn)

query = sql

df = p.read_sql(query, db_con_azure)

After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.

3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Modified On 02-Jun-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsLog()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Accessing from Azure SQL DB
        x1 = dbcon.clsDBLookup()
        act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

        print("Azure SQL DB::")
        print(act_df)
        print()

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

        # Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
        df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

        df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

        print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
        print(df_fin)

        l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')

        # Transforming the orderDate as per standard format
        df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

        # Dropping the old column & renaming the new column to old column
        df_fin.drop(columns=['orderDate'], inplace=True)
        df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

        print("*" * 157)
        print()
        print("Final Combined & Transformed result:: ")
        print(df_fin)

        l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
        print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The key lines from this script –

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

This function converts NoSQL date data type more familiar format.

NoSQL Date:
NoSQL_Date
Transformed Date:
Transformed Date
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

print("Azure SQL DB::")
print(act_df)
print()

Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.

# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.

# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.

Let’s see the directory structure of our program –

Win_Vs_MAC

Let’s see how it looks when it runs –

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

So, finally, we’ve successfully blended the data & make more meaningful data projection.

Following python packages are required to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

pip install pyodbc

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]