Live visual reading using Convolutional Neural Network (CNN) through Python-based machine-learning application.

This week we’re planning to touch on one of the exciting posts of visually reading characters from WebCAM & predict the letters using CNN methods. Before we dig deep, why don’t we see the demo run first?

Demo

Isn’t it fascinating? As we can see, the computer can record events and read like humans. And, thanks to the brilliant packages available in Python, which can help us predict the correct letter out of an Image.


What do we need to test it out?

  1. Preferably an external WebCAM.
  2. A moderate or good Laptop to test out this.
  3. Python 
  4. And a few other packages that we’ll mention next block.

What Python packages do we need?

Some of the critical packages that we must need to test out this application are –

cmake==3.22.1
dlib==19.19.0
face-recognition==1.3.0
face-recognition-models==0.3.0
imutils==0.5.3
jsonschema==4.4.0
keras==2.7.0
Keras-Preprocessing==1.1.2
matplotlib==3.5.1
matplotlib-inline==0.1.3
oauthlib==3.1.1
opencv-contrib-python==4.1.2.30
opencv-contrib-python-headless==4.4.0.46
opencv-python==4.5.5.62
opencv-python-headless==4.5.5.62
pickleshare==0.7.5
Pillow==9.0.0
python-dateutil==2.8.2
requests==2.27.1
requests-oauthlib==1.3.0
scikit-image==0.19.1
scikit-learn==1.0.2
tensorboard==2.7.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.7.0
tensorflow-estimator==2.7.0
tensorflow-io-gcs-filesystem==0.23.1
tqdm==4.62.3

What is CNN?

In deep learning, a convolutional neural network (CNN/ConvNet) is a class of deep neural networks most commonly applied to analyze visual imagery.

Different Steps of CNN

We can understand from the above picture that a CNN generally takes an image as input. The neural network analyzes each pixel separately. The weights and biases of the model are then tweaked to detect the desired letters (In our use case) from the image. Like other algorithms, the data also has to pass through pre-processing stage. However, a CNN needs relatively less pre-processing than most other Deep Learning algorithms.

If you want to know more about this, there is an excellent article on CNN with some on-point animations explaining this concept. Please read it here.

Where do we get the data sets for our testing?

For testing, we are fortunate enough to have Kaggle with us. We have received a wide variety of sample data, which you can get from here.


Our use-case:

Architecture

From the above diagram, one can see that the python application will consume a live video feed of any random letters (both printed & handwritten) & predict the character as part of the machine learning model that we trained.


Code:

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'A_Z_Handwritten_Data.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize'😦3, 3),
'poolSize'😦2, 2),
'filterVal1':32,
'filterVal2':64,
'filterVal3':128,
'stridesVal':2,
'monitorVal':'val_loss',
'paddingVal1':'same',
'paddingVal2':'valid',
'reshapeVal':28,
'reshapeVal1'😦28,28),
'patienceVal1':1,
'patienceVal2':2,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'minDeltaVal':0,
'minLrVal':0.0001,
'verboseFlag':0,
'modeInd':'auto',
'shuffleVal':100,
'DenkseVal1':26,
'DenkseVal2':64,
'DenkseVal3':128,
'predParam':9,
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},
'width':640,
'height':480,
'imgSize': (32,32),
'threshold': 0.45,
'imgDimension': (400, 440),
'imgSmallDim': (7, 7),
'imgMidDim': (28, 28),
'reshapeParam1':1,
'reshapeParam2':28,
'colorFeed'😦0,0,130),
'colorPredict'😦0,25,255)
}

view raw

clsConfig.py

hosted with ❤ by GitHub

Important parameters that we need to follow from the above snippets are –

'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize':(3, 3),
'poolSize':(2, 2),
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},

Since we have 26 letters, we have classified it as 26 in the numOfClasses.

Since we are talking about characters, we had to come up with a process of identifying each character as numbers & then processing our entire logic. Hence, the above parameter named word_dict captured all the characters in a python dictionary & stored them. Moreover, the application translates the final number output to more appropriate characters as the prediction.

2. clsAlphabetReading.py (Main training class to teach the model to predict alphabets from visual reader.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
from keras.datasets import mnist
import matplotlib.pyplot as plt
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout
from tensorflow.keras.optimizers import SGD, Adam
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.utils.np_utils import to_categorical
import pandas as p
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook
from sklearn.utils import shuffle
import pickle
import os
import platform as pl
from clsConfig import clsConfig as cf
class clsAlphabetReading:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.fileName = str(cf.conf['FILE_NAME'])
self.testRatio = float(cf.conf['testRatio'])
self.valRatio = float(cf.conf['valRatio'])
self.epochsVal = int(cf.conf['epochsVal'])
self.activationType = str(cf.conf['activationType'])
self.activationType2 = str(cf.conf['activationType2'])
self.numOfClasses = int(cf.conf['numOfClasses'])
self.kernelSize = cf.conf['kernelSize']
self.poolSize = cf.conf['poolSize']
self.filterVal1 = int(cf.conf['filterVal1'])
self.filterVal2 = int(cf.conf['filterVal2'])
self.filterVal3 = int(cf.conf['filterVal3'])
self.stridesVal = int(cf.conf['stridesVal'])
self.monitorVal = str(cf.conf['monitorVal'])
self.paddingVal1 = str(cf.conf['paddingVal1'])
self.paddingVal2 = str(cf.conf['paddingVal2'])
self.reshapeVal = int(cf.conf['reshapeVal'])
self.reshapeVal1 = cf.conf['reshapeVal1']
self.patienceVal1 = int(cf.conf['patienceVal1'])
self.patienceVal2 = int(cf.conf['patienceVal2'])
self.sleepTime = int(cf.conf['sleepTime'])
self.sleepTime1 = int(cf.conf['sleepTime1'])
self.factorVal = float(cf.conf['factorVal'])
self.learningRateVal = float(cf.conf['learningRateVal'])
self.minDeltaVal = int(cf.conf['minDeltaVal'])
self.minLrVal = float(cf.conf['minLrVal'])
self.verboseFlag = int(cf.conf['verboseFlag'])
self.modeInd = str(cf.conf['modeInd'])
self.shuffleVal = int(cf.conf['shuffleVal'])
self.DenkseVal1 = int(cf.conf['DenkseVal1'])
self.DenkseVal2 = int(cf.conf['DenkseVal2'])
self.DenkseVal3 = int(cf.conf['DenkseVal3'])
self.predParam = int(cf.conf['predParam'])
self.word_dict = cf.conf['word_dict']
def applyCNN(self, X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg):
try:
testRatio = self.testRatio
epochsVal = self.epochsVal
activationType = self.activationType
activationType2 = self.activationType2
numOfClasses = self.numOfClasses
kernelSize = self.kernelSize
poolSize = self.poolSize
filterVal1 = self.filterVal1
filterVal2 = self.filterVal2
filterVal3 = self.filterVal3
stridesVal = self.stridesVal
monitorVal = self.monitorVal
paddingVal1 = self.paddingVal1
paddingVal2 = self.paddingVal2
reshapeVal = self.reshapeVal
patienceVal1 = self.patienceVal1
patienceVal2 = self.patienceVal2
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
factorVal = self.factorVal
learningRateVal = self.learningRateVal
minDeltaVal = self.minDeltaVal
minLrVal = self.minLrVal
verboseFlag = self.verboseFlag
modeInd = self.modeInd
shuffleVal = self.shuffleVal
DenkseVal1 = self.DenkseVal1
DenkseVal2 = self.DenkseVal2
DenkseVal3 = self.DenkseVal3
model = Sequential()
model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Flatten())
model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))
model.add(Dense(DenkseVal1,activation = activationType2))
model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)
fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop], validation_data = (X_Validation,Y_Validation_Catg))
return (model, fittedModel)
except Exception as e:
x = str(e)
model = Sequential()
print('Error: ', x)
return (model, model)
def trainModel(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
fileName = self.fileName
epochsVal = self.epochsVal
valRatio = self.valRatio
predParam = self.predParam
testRatio = self.testRatio
reshapeVal = self.reshapeVal
numOfClasses = self.numOfClasses
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
shuffleVal = self.shuffleVal
reshapeVal1 = self.reshapeVal1
# Dictionary for getting characters from index values
word_dict = self.word_dict
print('File Name: ', str(fileName))
# Read the data
df_HW_Alphabet = p.read_csv(fileName).astype('float32')
# Sample Data
print('Sample Data: ')
print(df_HW_Alphabet.head())
# Split data the (x – Our data) & (y – the prdict label)
x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']
# Reshaping the data in csv file to display as an image
X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)
X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))
print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)
# Plotting the number of alphabets in the dataset
Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
count[i] +=1
alphabets = []
for i in word_dict.values():
alphabets.append(i)
fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)
plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()
# Shuffling the data
shuff = shuffle(X_Train[:shuffleVal])
# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)
X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)
X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)
# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)
Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)
Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)
model, history = self.applyCNN(X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg)
print('Model Summary: ')
print(model.summary())
# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])
# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Making the model to predict
pred = model.predict(X_Test[:predParam])
print('Test Details::')
print('X_Test: ', X_Test.shape)
print('Y_Test_Catg: ', Y_Test_Catg.shape)
try:
score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
print('Test Score = ', score[0])
print('Test Accuracy = ', score[1])
except Exception as e:
x = str(e)
print('Error: ', x)
# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()
for i in range(9):
axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
pred = word_dict[np.argmax(Y_Test_Catg[i])]
print('Prediction: ', pred)
axes[i].set_title("Test Prediction: " + pred)
axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets from the above scripts are –

x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']

In the above snippet, we have split the data into images & their corresponding labels.

X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)

X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))


print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)

We are splitting the data into Train, Test & Validation sets to get more accurate predictions and reshaping the raw data into the image by consuming the 784 data columns to 28×28 pixel images.

Since we are talking about characters, we had to come up with a process of identifying The following snippet will plot the character equivalent number into a matplotlib chart & showcase the overall distribution trend after splitting.

Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
    count[i] +=1

alphabets = []
for i in word_dict.values():
    alphabets.append(i)

fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)

plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()

Note that we have tweaked the plt.show property with (block=False). This property will enable us to continue execution without human interventions after the initial pause.

# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)

X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)

X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)

# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)

Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)

Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)

In the above diagram, the application did reshape all three categories of data before calling the primary CNN function.

model = Sequential()

model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Flatten())

model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))

model.add(Dense(DenkseVal1,activation = activationType2))

model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)


fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop],  validation_data = (X_Validation,Y_Validation_Catg))

return (model, fittedModel)

In the above snippet, the convolution layers are followed by maxpool layers, which reduce the number of features extracted. The output of the maxpool layers and convolution layers are flattened into a vector of a single dimension and supplied as an input to the Dense layer—the CNN model prepared for training the model using the training dataset.

We have used optimization parameters like Adam, RMSProp & the application we trained for eight epochs for better accuracy & predictions.

# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])

# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Also, we have captured the validation Accuracy & Loss & plot them into two separate graphs for better understanding.

try:
    score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
    print('Test Score = ', score[0])
    print('Test Accuracy = ', score[1])
except Exception as e:
    x = str(e)
    print('Error: ', x)

Also, the application is trying to get the accuracy of the model that we trained & validated with the training & validation data. This time we have used test data to predict the confidence score.

# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()

for i in range(9):
    axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
    pred = word_dict[np.argmax(Y_Test_Catg[i])]
    print('Prediction: ', pred)
    axes[i].set_title("Test Prediction: " + pred)
    axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Finally, the application testing with some random test data & tried to plot the output & prediction assessment.

Testing with Random Test Data
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()

As a part of the last step, the application will generate the models using a pickle package & save them under a specific location, which the reader application will use.

3. trainingVisualDataRead.py (Main application that will invoke the training class to predict alphabet through WebCam using Convolutional Neural Network (CNN).)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsAlhpabetReading class to initiate ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
# We keep the setup code in a different class as shown below.
import clsAlphabetReading as ar
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = ar.clsAlphabetReading()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.trainModel(debugInd, var)
if (r1 == 0):
print('Successfully Visual Alphabet Training Completed!')
else:
print('Failed to complete the Visual Alphabet Training!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the core snippet from the above script is –

x1 = ar.clsAlphabetReading()

Instantiate the main class.

r1 = x1.trainModel(debugInd, var)

The python application will invoke the class & capture the returned value inside the r1 variable.

4. readingVisualData.py (Reading the model to predict Alphabet using WebCAM.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 18-Jan-2022 ####
#### Modified On 18-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### scan the live video feed from the ####
#### web-cam & predict the alphabet that ####
#### read it. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import cv2
import pickle
import numpy as np
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
epochsVal = int(cf.conf['epochsVal'])
numOfClasses = int(cf.conf['numOfClasses'])
word_dict = cf.conf['word_dict']
width = int(cf.conf['width'])
height = int(cf.conf['height'])
imgSize = cf.conf['imgSize']
threshold = float(cf.conf['threshold'])
imgDimension = cf.conf['imgDimension']
imgSmallDim = cf.conf['imgSmallDim']
imgMidDim = cf.conf['imgMidDim']
reshapeParam1 = int(cf.conf['reshapeParam1'])
reshapeParam2 = int(cf.conf['reshapeParam2'])
colorFeed = cf.conf['colorFeed']
colorPredict = cf.conf['colorPredict']
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Live Streaming!')
cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)
while True:
status, img = cap.read()
if status == False:
break
img_copy = img.copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)
img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)
img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))
img_pred = word_dict[np.argmax(model.predict(img_final))]
# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)
cv2.putText(img, "Live Feed : (" + str(probVal) + "%) ", (20,25), cv2.FONT_HERSHEY_TRIPLEX, 0.7, color = colorFeed)
cv2.putText(img, "Prediction: " + img_pred, (20,410), cv2.FONT_HERSHEY_DUPLEX, 1.3, color = colorPredict)
cv2.imshow("Original Image", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
r1=0
break
if (r1 == 0):
print('Successfully Alphabets predicted!')
else:
print('Failed to predict alphabet!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the key snippet from the above code is –

cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)

The application is reading the live video data from WebCAM. Also, set out the height & width for the video output.

fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)

The application reads the model output generated as part of the previous script using the pickle package.

while True:
    status, img = cap.read()

    if status == False:
        break

The application will read the WebCAM & it exits if there is an end of video transmission or some kind of corrupt video frame.

img_copy = img.copy()

img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)

img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)

img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))


img_pred = word_dict[np.argmax(model.predict(img_final))]

We have initially cloned the original video frame & then it converted from BGR2GRAYSCALE while applying the threshold on it doe better prediction outcomes. Then the image has resized & reshaped for model input. Finally, the np.argmax function extracted the class index with the highest predicted probability. Furthermore, it is translated using the word_dict dictionary to an Alphabet & displayed on top of the Live View.

# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)

Also, derive the confidence score of that probability & display that on top of the Live View.

if cv2.waitKey(1) & 0xFF == ord('q'):
    r1=0
    break

The above code will let the developer exit from this application by pressing the “Esc” or “q”-key from the keyboard & the program will terminate.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality of Alphabet.

Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = 1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = 1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
def create_dashboard(update_freq=0.0):
page = site['/dashboard_finnhub']
general_log_path = str(cf.config['LOG_PATH'])
ably_id = str(cf.config['ABLY_ID'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO)
os_det = pl.system()
if os_det == "Windows":
src_path = path + '\\' + 'data\\'
else:
src_path = path + '/' + 'data/'
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
# Writing to the file
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir)
# Dropping unwanted columns
df_conv.drop(columns=['c'], axis=1, inplace=True)
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
# New Column List Orders
column_order = ['s', 'default_rank', 'p', 't', 'v']
df_unique_fin = df_unique.reindex(column_order, axis=1)
print('Rank DF Unique:')
print(df_unique_fin)
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir)
# Capturing transformed values into a DataFrame
# Depending on your logic, you'll implement that inside
# the process_DF functions
fin_df = process_DF(df_conv, df_unique_fin)
df_unq_fin = df_unique_fin.copy()
df_unq_fin.rename(columns={'s':'Company'}, inplace=True)
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True)
df_unq_fin.rename(columns={'v':'Change'}, inplace=True)
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True)
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir)
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True)
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir)
# Final clearance for better understanding of data
fin_df.drop(columns=['t'], axis=1, inplace=True)
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir)
count_row = df_unq_finale.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', 15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[1] = [cat, val]
page.save()
if __name__ == "__main__":
try:
# Main Calling script
create_dashboard(update_freq=0.25)
except Exception as e:
x = str(e)
print(x)

Let’s explore the key snippets from the above script –

def process_DF(inputDF, inputDFUnq):
    try:
        # Core Business logic
        # The application will show default value to any
        # trade-in stock in case that data doesn't consume
        # from the source.
        
        # Getting block count
        #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
        #l.logr('3. block_df.csv', 'Y', df_conv, subdir)

        # Getting block count
        #df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
        #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
        #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
        #df_conv_fin = df_conv.value_counts(['default_rank'])
        df_conv = inputDF
        df_unique_fin = inputDFUnq

        df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
        l.logr('3. max_df.csv', 'Y', df_conv, subdir)


        # Sorting the output
        sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)

        # New Column List Orders
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        df_fin = sorted_df.reindex(column_order, axis=1)

        l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)

        # Now splitting the sorted df into two sets
        lkp_max_count = 4
        df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]

        l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)

        df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
        l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)

        # Now to perform cross join, we will create
        # a key column in both the DataFrames to
        # merge on that key.
        df_unique_fin['key'] = 1
        df_fin_req['key'] = 1

        # Dropping unwanted columns
        df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
        l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)

        # Padding with dummy key values
        #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
        merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)

        l.logr('8. merge_df.csv', 'Y', merge_df, subdir)

        # Sorting the output
        sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)

        l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)

        # Calling new derived logic
        sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
        sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)

        l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Dropping unwanted columns
        sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)

        #Renaming the columns
        sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
        sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)

        l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Aligning columns
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)

        l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)

        # Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
        frames = [df_fin_na, merge_fin_df]
        fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])

        l.logr('13. fin_df.csv', 'Y', fin_df, subdir)

        # Final clearance & organization
        fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)

        l.logr('14. Final.csv', 'Y', fin_df, subdir)

        # Adjusting key columns
        fin_df.rename(columns={'s':'Company'}, inplace=True)
        fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
        fin_df.rename(columns={'v':'Change'}, inplace=True)

        l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)

        return fin_df
    except Exception as e:
        print('$' * 120)

        x = str(e)
        print(x)

        print('$' * 120)

        df = p.DataFrame()

        return df

The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –

a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1

Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.

def calc_p(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_p_val = float(row['p_y'])
        else:
            calc_p_val = float(row['p_x'])

        return calc_p_val
    except:
        return 0.0

def calc_v(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_v_val = float(row['v_y'])
        else:
            calc_v_val = float(row['v_x'])

        return calc_v_val
    except:
        return 0.0

The above snippet will capture the default values for those missing records.

    client = AblyRest(ably_id)
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

In the above snippet, the application will consume the streaming data from the Ably queue.

for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.

    # Converting dataframe to a desired Series
    f = CategoricalSeries(fin_df)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unq_finale.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Company'])

        ####################################################
        ##### Debug Purpose                        #########
        ####################################################
        print('Company: ', x_currency)
        print('J: ', str(j))
        print('Cat: ', cat)
        ####################################################
        #####   End Of Debug                         #######
        ####################################################

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            print('Update Cat: ', cat)
            print('Update Val: ', val)
            print('Update pc: ', pc)
            print('*' * 160)

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.


2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
# generate random floating point values
from random import seed
from random import random
# seed random number generator
import websocket
import json
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.config['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
def on_message(ws, message):
print("*" * 60)
res = json.loads(message)
jsBody = res["data"]
jdata_dyn = json.dumps(jsBody)
print(jdata_dyn)
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{
"c": "null",
"p": 0.01,
"s": "AAPL",
"t": 1624715406407,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "AMZN",
"t": 1624715406408,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "BINANCE:BTCUSDT",
"t": 1624715406409,
"v": 0.01
},
{
"c": "null",
"p": 0.01,
"s": "IC MARKETS:1",
"t": 1624715406410,
"v": 0.01
}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
jsBody = []
jdata_dyn = ''
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
# Invoking Individual Company Trade Queries
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()

The key snippet from the above script –

    json_data = [{
        "c": "null",
        "p": 0.01,
        "s": "AAPL",
        "t": 1624715406407,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "AMZN",
        "t": 1624715406408,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "BINANCE:BTCUSDT",
        "t": 1624715406409,
        "v": 0.01
    },
        {
        "c": "null",
        "p": 0.01,
        "s": "IC MARKETS:1",
        "t": 1624715406410,
        "v": 0.01
        }]

As we already discussed, we’ll pass a default set of data for all the candidate companies.

    # Publish a message to the sd_channel channel
    channel.publish('event', jdata)

    # Publish rest of the messages to the sd_channel channel
    channel.publish('event', jdata_dyn)

Publish the messages to the created channel.

def on_open(ws):
    # Invoking Individual Company Trade Queries
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

Send the company-specific trade queries through websocket apps to submit that to FinnHub.

3. clsConfig.py ( This file contains the configuration details. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning. Application will ####
#### process these information & perform ####
#### various analysis on Linear-Regression. ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'Data' + sep,
'APP_DESC_1': 'H2O Wave Integration with FinHubb!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e'
}

view raw

clsConfig.py

hosted with ❤ by GitHub


Let’s explore the directory structure –

MAC Directory

Let’s run the application –

Step 1:

Starting of Wave Server

Step 2:

Triggering message consumption job

Step 3:

Triggering the main application

You can monitor the message consumption from your Ably portal as follows –

Message Consumption

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

Message spike during consumption

And, the final output in the interactive dashboard will be look like the below screenshot –

Interactive Real-time Dashboard

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Azure-API calls from python-based OCI function through the oracle API-Gateway.

Today, I’ll be discussing Oracle Cloud Function interaction with Azure-API through Oracle API Gateway using native python. Again, I want to touch on this subject as I didn’t find lots of relevant material using python over the net.

Let’s explore our use case. For this use case, I’ll use an old Azure-API that I’ve developed in early 2019 & shared here during that time.

Now, we need to prepare our environment in Oracle-cloud.

Step 1:

We need to configure the virtual network as shown in the below collage picture, which will depict the step-by-step process to create it. For security reasons, I’ve masked sensitive information. It would help if you captured them from your cloud portal.

VCN creation process

Make sure you choose the correct options & validate at the end, as shown in the below picture.

VCN Creation – Final Step

If all the information provided is correct, then you should see the following screen.

VCN Creation – Final Screen

Step 2:

Now, we need to create an application. As per OCI guidelines, one cannot generate any function or group of functions without the container, known as application.

Creation of Application

From the above collage pic, you can see how we create the application by providing all the necessary inputs.

Step 3:

Now, you need to create the registry as shown below –

Creation of Registry

Your function-container will stay inside it after deployment. To know more about this, click the following link.

Step 4:

If you haven’t generated the auth-token already, then this is the time to render it as shown below –

Generation of Auth-Token

Step 5:

This next piece of information is highly crucial & on many occasions, you need this piece of information.

Object storage namespace

Just keep this information handy. I’ll refer to this step whenever we need it. You can get the details here.

Step 6:

Let’s create the gateway now. Please refer to the following collage pics, showing the step-by-step process.

Creation of Gateway

Make sure you have validated it before you proceed to the next step.

Step 7:

Let’s create the function under the application. I find this GUI option is relatively easier than configuring locally & then push it to the OCI. Let’s follow the process shown in the collage of pics mentioned here –

Creation of Function

So, you need to click executing series of commands as shown above. And, the good thing is the majority of the critical pieces of commands are automatically generated for you. So, you don’t need to spend lots of time finding out this information.

Here, we’ll be executing a series of commands as shown below –

Creation of function – continue

Few necessary commands that I want to discuss here –

fn init --runtime python <function-name>

This command will create a template of scripts based on your supplied language. You need to modify the main script (func.py) later, with your appropriate logic. You can add other scripts as class & refer to that class inside your func.py as well.

For a better deployment & control environment, it is always wise to create a virtual env.

Just like the Azure function, you need to update your requirements.txt file before your deployment command.

pip freeze>requirements.txt

Once we are satisfied with our development; we’ll deploy the application as shown below –

Deployment of function

Again, few relevant command that I want to discuss it here –

fn -v deploy --app <Application-Name>

This command will deploy all the oracle functions if they have any changes & push them to the OCI. During this time, it will check all the dependant packages that you are using & tried to install them one-by-one.

If you have already deployed & you want to upgrade your logic, then the deployment option will show something like this –

Deployment of function – continue

All the commands are pretty standard & marked with a red-square box. Few necessary commands to discuss –

fn invoke <Application-Name> <Function-Name>

And if you are not using any external API. Ideally, the above command should return the output with the default value. But, for our case, we have used Azure-API, which is outside the OCI. Hence, we need to update few more settings before it works.

Unlike, Azure-Function, you won’t get the link by default when running them locally using Visual Studio Code editor.

Here, you need to execute the following commands as shown in the above picture –

fn inspect function <Application-Name> <Function-Name>

If your deployment is successful, you will see your function docker-image inside your registry as shown below –

Deployment image of functions

To know more about fn-commands, click the following link.

Step 8:

Now, you need to update some policies, which will help API-Gateway to work.

Update of policy & logging feature

Also, you need to configure your default log for your function, as shown above.

Apart from that, we need to whitelist the port 443 as shown below –

Port whitelisting in VCN

Finally, we need to deploy our existing function into Oracle-Gateway. It would help if you prepared a deployable json object, which will create a channel for the function to interact through the API-gateway deployment.

Deployment of function inside API-Gateway

The deployment json file should looks something like this –

spec.json


{
"routes": [
{
"path": "/getdata",
"methods": [
"GET","POST"
],
"backend": {
"type": "ORACLE_FUNCTIONS_BACKEND",
"functionId": "ocid1.fnfunc.oc1.us-sanjose-1.aaaaxxxxxxxjdjfjfjfjfjfjfjfjfjfjfjfjfjfjfjdsssssss2dfjdfjdjd33376dq"
}
}
]
}

view raw

spec.json

hosted with ❤ by GitHub

You will get more on this from this link.

Make sure that your path prefix should be unique, as shown in the above picture. And, if you want to know the complete steps to prepare your oracle function, you need to go through this master link.

Now, we’re ready to test the application. But, before that, we want to explore the code-base.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsAzureAPI.py ( This is the modified version of old AzureAPI class. We’ve added a new logger, which works inside OCI. No other changes in the man logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
strMsg = 'Input JSON: ' + str(querystring)
logging.getLogger().info(strMsg)
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

3. func.py ( Main calling script. This one auto-genarated by OCI, while creating the functions. We’ve modified it as per our logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Mar-2021 ####
#### Modified On 20-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import io
import json
import logging
from fdk import response
import clsAzureAPI as ca
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def handler(ctx, data: io.BytesIO = None):
try:
email = "default@gmail.com"
# Checking individual elements
try:
body = json.loads(data.getvalue())
email = body.get("email")
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))
logging.getLogger().info("Calling Oracle Python getCovidData function!")
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
resJson = json.loads(retJson)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Success", "message": resJson}),
headers={"Content-Type": "application/json"}
)
except Exception as e:
x = str(e)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Failed", "message": x}),
headers={"Content-Type": "application/json"}
)

view raw

func.py

hosted with ❤ by GitHub

Key snippet that we want to discuss here –

        # Checking individual elements
        try:
            body = json.loads(data.getvalue())
            email = body.get("email")
        except (Exception, ValueError) as ex:
            logging.getLogger().info('error parsing json payload: ' + str(ex))

Checking the individual element in the input payload.

        # Create the instance of the Mock Mulesoft API Class
        x1 = ca.clsAzureAPI()

        # Let's pass this to our map section
        retJson = x1.searchQry()

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        resJson = json.loads(retJson)

Now, we’re calling the azure-API class & receiving the response into a JSON variable.

return response.Response(
            ctx, response_data=json.dumps(
                {"status":"Success", "message": resJson}),
            headers={"Content-Type": "application/json"}
        )

Sending final response to the client.

4. func.yaml ( Main configuration script. This one auto-genarated by OCI, while creating the functions. )


schema_version: 20180708
name: getcoviddata
version: 0.0.1
runtime: python
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256

view raw

func.yaml

hosted with ❤ by GitHub


Let’s run it from postman –

Invoking OCI-Function from Postman

During this demo, I’ve realized that the Oracle function yet to get maturity compared to AWS Lambda or Azure function using python. I almost faced similar challenges, which I faced nearly two years back when I tried to implement Azure function using python. However, I’m optimistic that the Oracle Cloud function will mature itself & share an integrated GUI environment to deploy python-based components straight from the IDE, rather than implementing through a CLI-driven approach. Correct me in case if I missed the IDE, which supports this feature.


You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.

Creating a dynamic response of an API/Microservice

Hello Guys!

Today, I’m going to discuss a potential use case, where on many occasions, different teams need almost similar kinds of data through API. However, they are not identical. Creating a fresh API/Microservice after following-up with many processes will take significant time.

What if we can create an API in such a way so that we can get the response dynamically without needing to make another one. In this post, we’ll be demonstrating a similar approach.

I’ll be using open-source Covid-API, which will be useful for several posts starting from this one.

You will get plenty of useful data from here.

We’ve chosen the following one for our use case –

API-Reference

Let’s explore the sample data first.

[
   {
      "date":20210207,
      "state":"AK",
      "positive":53279.0,
      "probableCases":null,
      "negative":null,
      "pending":null,
      "totalTestResultsSource":"totalTestsViral",
      "totalTestResults":1536911.0,
      "hospitalizedCurrently":44.0,
      "hospitalizedCumulative":1219.0,
      "inIcuCurrently":null,
      "inIcuCumulative":null,
      "onVentilatorCurrently":11.0,
      "onVentilatorCumulative":null,
      "recovered":null,
      "dataQualityGrade":"A",
      "lastUpdateEt":"2\/5\/2021 03:59",
      "dateModified":"2021-02-05T03:59:00Z",
      "checkTimeEt":"02\/04 22:59",
      "death":279.0,
      "hospitalized":1219.0,
      "dateChecked":"2021-02-05T03:59:00Z",
      "totalTestsViral":1536911.0,
      "positiveTestsViral":64404.0,
      "negativeTestsViral":1470760.0,
      "positiveCasesViral":null,
      "deathConfirmed":null,
      "deathProbable":null,
      "totalTestEncountersViral":null,
      "totalTestsPeopleViral":null,
      "totalTestsAntibody":null,
      "positiveTestsAntibody":null,
      "negativeTestsAntibody":null,
      "totalTestsPeopleAntibody":null,
      "positiveTestsPeopleAntibody":null,
      "negativeTestsPeopleAntibody":null,
      "totalTestsPeopleAntigen":null,
      "positiveTestsPeopleAntigen":null,
      "totalTestsAntigen":null,
      "positiveTestsAntigen":null,
      "fips":"02",
      "positiveIncrease":0,
      "negativeIncrease":0,
      "total":53279,
      "totalTestResultsIncrease":0,
      "posNeg":53279,
      "deathIncrease":0,
      "hospitalizedIncrease":0,
      "hash":"07a5d43f958541e9cdabb5ea34c8fb481835e130",
      "commercialScore":0,
      "negativeRegularScore":0,
      "negativeScore":0,
      "positiveScore":0,
      "score":0,
      "grade":""
   }
]

Let’s take two cases. One, where one service might need to access all the elements, there might be another, where some other service requires specific details.

Let’s explore the code base first –

  1. init.py ( This native Python-based azure-function that will consume streaming data & dynamic API response. )
###########################################
#### Written By: SATYAKI DE            ####
#### Written On: 06-Feb-2021           ####
#### Package Flask package needs to    ####
#### install in order to run this      ####
#### script.                           ####
####                                   ####
#### Objective: Main Calling scripts.  ####
####                                   ####
#### However, to meet the functionality####
#### we've enhanced as per our logic.  ####
###########################################

import logging
import json
import requests
import os
import pandas as p
import numpy as np

import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Dynamic-Covid-Status HTTP trigger function processed a request.')

    try:

        # Application Variable
        url = os.environ['URL']
        appType = os.environ['appType']
        conType = os.environ['conType']

        # API-Configuration
        payload={}
        headers = {
            "Connection": conType,
            "Content-Type": appType
        }

        # Validating input parameters
        typeSel = req.params.get('typeSel')
        if not typeSel:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeSel = req_body.get('typeSel')
        
        typeVal = req.params.get('typeVal')
        if not typeVal:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                typeVal = req_body.get('typeVal')

        # Printing Key-Element Values
        str1 = 'typeSel: ' + str(typeSel)
        logging.info(str1)

        str2 = 'typeVal: ' + str(typeVal)
        logging.info(str2)

        # End of API-Inputs

        # Getting Covid data from the REST-API
        response = requests.request("GET", url, headers=headers, data=payload)
        ResJson  = response.text

        if typeSel == '*':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                rJson = df_ret.to_json(orient ='records') 

                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for all values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        elif typeSel == 'Cols':
            if typeVal != '':
                # Converting it to Json
                jdata = json.loads(ResJson)

                df_ret = p.io.json.json_normalize(jdata)
                df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

                # Fetching for the selected columns
                # Extracting the columns from the list
                lstHead = []

                listX = typeVal.split (",")

                for i in listX:
                    lstHead.append(str(i).strip())

                str3 = 'Main List: ' + str(lstHead)
                logging.info(str3)

                slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
                rJson = slice_df.to_json(orient ='records') 
                
                return func.HttpResponse(rJson, status_code=200)
            else:
                x_stat = 'Failed'
                x_msg = 'Important information is missing for selected values!'

                rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

                xval = json.dumps(rJson)
                return func.HttpResponse(xval, status_code=200)
        else:
            x_stat = 'Failed'
            x_msg = 'Important information is missing for typeSel!'

            rJson = {
                "status": x_stat,
                "details": x_msg
            }

            xval = json.dumps(rJson)
            return func.HttpResponse(xval, status_code=200)
    except Exception as e:
        x_msg = str(e)
        x_stat = 'Failed'

        rJson = {
                    "status": x_stat,
                    "details": x_msg
                }

        xval = json.dumps(rJson)
        return func.HttpResponse(xval, status_code=200)

And, Inside the azure portal it looks like –

Dynamic Function inside the Azure portal

Let’s explain the key snippet –

jdata = json.loads(ResJson)

df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

rJson = df_ret.to_json(orient ='records') 

return func.HttpResponse(rJson, status_code=200)

In the above lines, we’re converting the response & organizing it to a pandas dataframe before converting the response to JSON.

# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []

listX = typeVal.split (",")

for i in listX:
    lstHead.append(str(i).strip())

str3 = 'Main List: ' + str(lstHead)
logging.info(str3)

#slice_df = df_ret[df_ret.columns.intersection(lstHead)]
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]

For the second case, the above additional logic will play a significant part. Based on the supplied input in the typeVal attribute, this time, the new response will display accordingly.

Let’s see how it looks –

Azure function in Visual Studio Code
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">Let's test it using Postman -Let’s test it using Postman –

Case 1 (For all the columns):

For all elements

And, the formatted output is as follows –

Formatted output for all elements

Case 2 (For selected columns):

For selected elements
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">And, the formatted output is as follows -And, the formatted output is as follows –
Formatted output of Selected element case

You can find the code in the Github using the following link.


So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Performance improvement of Python application programming

Hello guys,

Today, I’ll be demonstrating a short but significant topic. There are widespread facts that, on many occasions, Python is relatively slower than other strongly typed programming languages like C++, Java, or even the latest version of PHP.

I found a relatively old post with a comparison shown between Python and the other popular languages. You can find the details at this link.

However, I haven’t verified the outcome. So, I can’t comment on the final statistics provided on that link.

My purpose is to find cases where I can take certain tricks to improve performance drastically.

One preferable option would be the use of Cython. That involves the middle ground between C & Python & brings the best out of both worlds.

The other option would be the use of GPU for vector computations. That would drastically increase the processing power. Today, we’ll be exploring this option.

Let’s find out what we need to prepare our environment before we try out on this.

Step – 1 (Installing dependent packages):

pip install pyopencl
pip install plaidml-keras

So, we will be taking advantage of the Keras package to use our GPU. And, the screen should look like this –

Installation Process of Python-based Packages

Once we’ve installed the packages, we’ll configure the package showing on the next screen.

Configuration of Packages

For our case, we need to install pandas as we’ll be using numpy, which comes default with it.

Installation of supplemental packages

Let’s explore our standard snippet to test this use case.

Case 1 (Normal computational code in Python):

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 18-Jan-2020              ####
####                                      ####
#### Objective: Main calling scripts for  ####
#### normal execution.                    ####
##############################################

import numpy as np
from timeit import default_timer as timer

def pow(a, b, c):
    for i in range(a.size):
         c[i] = a[i] ** b[i]

def main():
    vec_size = 100000000

    a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
    c = np.zeros(vec_size, dtype=np.float32)

    start = timer()
    pow(a, b, c)
    duration = timer() - start

    print(duration)

if __name__ == '__main__':
    main()

Case 2 (GPU-based computational code in Python):

#################################################
#### Written By: SATYAKI DE                  ####
#### Written On: 18-Jan-2020                 ####
####                                         ####
#### Objective: Main calling scripts for     ####
#### use of GPU to speed-up the performance. ####
#################################################

import numpy as np
from timeit import default_timer as timer

# Adding GPU Instance
from os import environ
environ["KERAS_BACKEND"] = "plaidml.keras.backend"

def pow(a, b):
    return a ** b

def main():
    vec_size = 100000000

    a = b = np.array(np.random.sample(vec_size), dtype=np.float32)
    c = np.zeros(vec_size, dtype=np.float32)

    start = timer()
    c = pow(a, b)
    duration = timer() - start

    print(duration)

if __name__ == '__main__':
    main()

And, here comes the output for your comparisons –

Case 1 Vs Case 2:

Performance Comparisons

As you can see, there is a significant improvement that we can achieve using this. However, it has limited scope. Not everywhere you get the benefits. Until or unless Python decides to work on the performance side, you better need to explore either of the two options that I’ve discussed here (I didn’t mention a lot on Cython here. Maybe some other day.).

To get the codebase you can refer the following Github link.


So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Creating a mock API using Mulesoft RAML & testing it using Python

Hi Guys,

Today, I’ll be using a popular tool known as Mulesoft to generate a mock API & then we’ll be testing the same using python. Mulesoft is an excellent tool to rapidly develop API & also can integrate multiple cloud environments as an Integration platform. You can use their Anypoint platform to quickly design such APIs for your organization. You can find the details in the following link. However, considering the cost, many organization has to devise their own product or tool to do the same. That’s where developing a Python or Node.js or C# comes adequately considering the cloud platform.

Before we start, let us quickly know what Mock API is?

A mock API server imitates a real API server by providing realistic responses to requests. They can be on your local machine or the public Internet. Responses can be static or dynamic, and simulate the data the real API would return, matching the schema with data types, objects, and arrays.

And why do we need that?

A mock API server is useful during development and testing when live data is either unavailable or unreliable. While designing an API, you can use mock APIs to work concurrently on the front and back-end, as well as to gather feedback from developers. Our mock API sever guide for testing covers how you can use a mock API server so the absence of a real API doesn’t hold you back.

Often with internal projects, the API consumer (such as a front end developer through REST APIs) moves faster than the backend team building the API. This API mocking guide shows how a mock API server allows developers to consume a working API with the same interface as the eventual production API. As an added benefit, the backend team can discover where the mock API doesn’t meet the developer’s needs without spending developer time on features that may be removed or changed. This fast feedback loop can make engineering teams much more efficient.

If you need more information on this topic, you can refer to the following link.

Great! Since now we have a background of mock API – let’s explore how Mulesoft can help us here?

Mulesoft used the “RESTful API Modeling Language (RAML)” language. We’ll be using this language to develop our mock API. To know more about this, you can view the following link.

Under the developer section, you can find Tutorials as shown in the screenshot given below –

18. Type Of RAML

You can select any of the categories & learn basic scripting from it.

Now, let’s take a look at the process of creating a Mulesoft free account to test our theories.

Step 1:

Click the following link, and you will see the page as shown below –

0.1. Mulesoft Landing Page

Step 2:

Now, click the login shown in the RED square. You will see the following page –

0.2. Mulesoft Sign-Up Option

Step 3:

Please provide your credentials if you already have an account. Else, you have to click the “Sign-Up” & then you will need to provide the few details as shown below –

1. Mulesoft Registration

Step 4:

Once, you successfully create the account, you will see the following page –

2. Mulesoft Interface

So, now we are set. To design an API, you will need to click the design center as marked within the white square.

Once you click the “Start designing” button, this will land into the next screen.

21. Creating a Projects

As shown above, you need to click the “Create new” for fresh API design.

This will prompt you the next screen –

22. Creating a Projects - Continue

Now, you need to create the – “Create API specification” as marked in the RED square box. And, that will prompt you the following screen –

23. Creating a Projects - Continue

You have to provide a meaningful name of our API & you can choose either Text or Visual editor. For this task, we’ll be selecting the Text Editor. And we’ll select RAML 1.0 as our preferred language. Once, we provide all the relevant information, the “Create Specification” button marked in Green will be activated. And then you need to click it. It will lead you to the next screen –

24. CodeSpace

Since we’ll be preparing this for mock API, we need to activate that by clicking the toggle button marked in the GREEN square box on the top-right side. And, this will generate an automated baseUri script as shown below –

25. CodeSpace - Continue

Now, we’re ready to develop our RAML code for the mock API. Let’s look into the RAML code.

1. phonevalisd.raml (This is the mock API script, which will send the response of an API request by returning a mock JSON if successful conditions met.)

#%RAML 1.0
# Created By - Satyaki De
# Date: 01-Mar-2020
# Description: This is an Mock API

baseUri: https://anypoint.mulesoft.com/mocking/api/v1/links/09KK0pos-1080-4049-9e04-a093456a64a8/ # 
title: PhoneVSD
securitySchemes:
  basic :
    type: Basic Authentication
    displayName: Satyaki's Basic Authentication
    description: API Only works with the basic authentication
protocols:
  - HTTP
description: This is a REST API Json base service to verify any phone numbers.
documentation:
  - title: PHONE VERIFY API
    content: This is a Mock API, which will simulate the activity of a Phone Validation API.
types:
  apiresponse:
    properties:
      valid: boolean
      number: string
      local_format: string
      international_format: string
      country_prefix: string
      country_code: string
      country_name: string
      location: string
      carrier: string
      line_type: string

/validate:
  get:
    queryParameters:
      access_key: string
      number: string
      country_code: string
      format: string
    description: For Validating the phone
    displayName: Validate phone
    protocols:
      - HTTP
    responses:
      403:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "Resource does not exists!"
              }
      400:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "API Key is invalid!"
              }
      200:
        body:
          application/json:
            type: apiresponse
            example:
              {
                "valid":true,
                "number":"17579758240",
                "local_format":"7579758240",
                "international_format":"+17579758240",
                "country_prefix":"+1",
                "country_code":"US",
                "country_name":"United States of America",
                "location":"Nwptnwszn1",
                "carrier":"MetroPCS Communications Inc.",
                "line_type":"mobile"
              }

Let’s quickly explore the critical snippet from the above script.

baseUri: https://anypoint.mulesoft.com/mocking/api/v1/links/86a5097f-1080-4049-9e04-a429219a64a8/ #

The above line will be our main URL when we’re planning to invoke that from Python script.

securitySchemes:
    basic :
        type: Basic Authentication

In this script, we’re looking for primary level authentication. Apart from that, we have the options of using OAUTH & many other acceptable formats.

protocols:
- HTTP

In this case, we’re going to use – “HTTP” as our preferred communication protocol.

responses:
      403:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "Resource does not exists!"
              }
      400:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "API Key is invalid!"
              }
      200:
        body:
          application/json:
            type: apiresponse
            example:
              {
                "valid":true,
                "number":"17579758240",
                "local_format":"7579758240",
                "international_format":"+17579758240",
                "country_prefix":"+1",
                "country_code":"US",
                "country_name":"United States of America",
                "location":"Nwptnwszn1",
                "carrier":"MetroPCS Communications Inc.",
                "line_type":"mobile"
              }

We’ve created a provision for a few specific cases of response as part of our business logic & standards.

Once, we’re done with our coding, we need to focus on two places as shown in the below picture –

26. Validation - mock API - Mulesoft

The snippet marked in RED square box, identifying our mandatory input parameters shown in the code as well as the right-hand side of the walls.

To test this mock API locally, you can pass these key parameters as follows –

27. Validation - mock API - Mulesoft - Continue

Now, you have to click the Send button marked in a GREEN square box. This will send your query parameters & as per our API response, you can see the output just below the Send button as follows –

28. Validation - mock API - Mulesoft - Continue

Now, we’re good to publish this mock API in the Mulesoft Anywhere portal. This will help us to test it from an external application i.e., Python-based application for our case. So, click the “Publish” button highlighted with the Blue square box. That will prompt the following screen –

29. Published

Now, we’ll click the “Public to Exchange” button marked with the GREEN square box. This will prompt the next screen as shown below –

30. Published - Continue

Now, you need to fill up the relevant details & then click – “Publish to Exchange,” as shown above. And, that will lead to the following screen –

31. Published - Continue

And, after a few second you will see the next screen –

32. Published - Continue

Now, you can click “Done” to close this popup. And, to verify the status, you can check it by clicking the top-left side of the code-editor & then click “Design Center” as shown below –

33. Published - Final

So, we’re done with our Mulesoft mock API design & deployment. Let’s test it from our Python application. We’ll be only discussing the key snippets here.

2. clsConfig.py (This is the parameter file for our mock API script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### Mulesoft Mock API. Application will  ####
#### process these information & perform  ####
#### the call to our newly developed Mock ####
#### API in Mulesoft.                     ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "https://anypoint.mulesoft.com/mocking/api/v1/links/a23e4e71-9c25-317b-834b-10b0debc3a30/validate",
        'CLIENT_SECRET': 'a12345670bacb1e3cec55e2f1234567d',
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Mule Mock API Calling!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

The key snippet from the above script is –

‘URL’: https://anypoint.mulesoft.com/mocking/api/v1/links/a23e4e71-9c25-317b-834b-10b0debc3a30/validate&#8221;,

This URL received from our RAML-editor generated by the Mulesoft API Designer studio.

3. clsMuleMockAPI.py (This is the main class to invoke our mock API script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 30-Jul-2020              ####
#### Modified On 30-Jul-2020              ####
####                                      ####
#### Objective: Main class scripts to     ####
#### invoke mock API.                     ####
##############################################

import json
from clsConfig import clsConfig as cf
import requests
import logging

class clsMuleMockAPI:
    def __init__(self):
        self.url = cf.config['URL']
        self.muleapi_key = cf.config['CLIENT_SECRET']
        self.muleapi_cache = cf.config['CACHE']
        self.muleapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, phNumber, cntCode, fmt):
        try:
            url = self.url
            muleapi_key = self.muleapi_key
            muleapi_cache = self.muleapi_cache
            muleapi_con = self.muleapi_con
            type = self.type

            querystring = {"access_key": muleapi_key, "number": phNumber, "country_code": cntCode, "format": fmt}

            print('Input JSON: ', str(querystring))

            headers = {
                'content-type': type,
                'Cache-Control': muleapi_cache,
                'Connection': muleapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

And, the key snippet from the above code –

querystring = {"access_key": muleapi_key, "number": phNumber, "country_code": cntCode, "format": fmt}

In the above lines, we’re preparing the query string, which will be passed into the API call.

response = requests.request("GET", url, headers=headers, params=querystring)

Invoking our API using requests method in python.

4. callMuleMockAPI.py (This is the first calling script to invoke our mock API script through our developed class python script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 30-Jul-2020              ####
#### Modified On 30-Jul-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsMuleMockAPI as cw
import pandas as p
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        # Declared Variable
        ret_1 = 0
        debug_ind = 'Y'
        res_2 = ''

        # Defining Generic Log File
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'MockMuleAPI.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print()

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to Mock Mulesoft API Calling Program: ')
        print('-' * 160)
        print('Please Press 1 for better formatted JSON: (Suitable for reading or debugging) ')
        print('Please Press 2 for unformated JSON: ')
        print()
        input_choice = int(input('Please provide your choice:'))
        print()

        # Create the instance of the Mock Mulesoft API Class
        x2 = cw.clsMuleMockAPI()

        # Let's pass this to our map section
        if input_choice == 1:
            fmt = "1"
            phNumber = str(input('Please provide the Phone Number (Without the country Code):'))
            cntCode  = str(input('Please provide the Country Code (Example: US):'))
            print()

            retJson = x2.searchQry(phNumber, cntCode, fmt )
        elif input_choice == 2:
            fmt = "0"
            phNumber = str(input('Please provide the Phone Number (Without the country Code):'))
            cntCode = str(input('Please provide the Country Code (Example: US):'))
            print()

            retJson = x2.searchQry(phNumber, cntCode, fmt)
        else:
            print('Invalid options!')
            retJson = {'errorDetails': 'Invalid Options!'}

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        res = json.loads(retJson)

        # Printing formatted JSON
        print()
        print('Output JSON::')
        print(json.dumps(res, indent=2))

        # Converting dictionary to Pandas Dataframe
        # df_ret = p.read_json(ret_2, orient='records')
        df_ret = p.io.json.json_normalize(res)
        df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

        # Removing any duplicate columns
        df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

        print()
        print()
        print("-" * 160)

        print('Publishing sample result: ')
        print(df_ret.head())

        # Logging Final Output
        l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')

        print("-" * 160)
        print()

        print('Finished Analysis points..')
        print("*" * 160)
        logging.info('Finished Analysis points..')
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The above script is pretty straight forward. First, we’re instantiating our essential class by this line –

# Create the instance of the Mock Mulesoft API Class
x2 = cw.clsMuleMockAPI()

And, then based on the logical condition we’re invoking it as follows –

retJson = x2.searchQry(phNumber, cntCode, fmt )

Now, we would like to explore the directory structure both in MAC & Windows –

14. Dir

Topside represents the MAC O/S structure, whereas the bottom part represents the Windows directory structure.

Let’s run the python application to test it.

10. Program_Run

In this case, the bottom side represents the MAC run, whereas the top side represents Windows run status.

The sample CSV log should look something like this –

Windows:

15. Log Win CSV

MAC:

15. Log CSV MAC

So, we’ve done it.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Predicting Flipkart business growth factor using Linear-Regression Machine Learning Model

Hi Guys,

Today, We’ll be exploring the potential business growth factor using the “Linear-Regression Machine Learning” model. We’ve prepared a set of dummy data & based on that, we’ll predict.

Let’s explore a few sample data –

1. Sample Data

So, based on these data, we would like to predict YearlyAmountSpent dependent on any one of the following features, i.e. [ Time On App / Time On Website / Flipkart Membership Duration (In Year) ].

You need to install the following packages –

pip install pandas

pip install matplotlib

pip install sklearn

We’ll be discussing only the main calling script & class script. However, we’ll be posting the parameters without discussing it. And, we won’t discuss clsL.py as we’ve already discussed that in our previous post.

1. clsConfig.py (This script contains all the parameter details.)

################################################
#### Written By: SATYAKI DE                 ####
#### Written On: 15-May-2020                ####
####                                        ####
#### Objective: This script is a config     ####
#### file, contains all the keys for        ####
#### Machine-Learning. Application will     ####
#### process these information & perform    ####
#### various analysis on Linear-Regression. ####
################################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'FlipkartCustomers.csv',
        'SRC_PATH': Curr_Path + sep + 'Data' + sep,
        'APP_DESC_1': 'IBM Watson Language Understand!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

2. clsLinearRegression.py (This is the main script, which will invoke the Machine-Learning API & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 15-May-2020              ####
#### Modified On 15-May-2020              ####
####                                      ####
#### Objective: Main scripts for Linear   ####
#### Regression.                          ####
##############################################

import pandas as p
import numpy as np
import regex as re

import matplotlib.pyplot as plt
from clsConfig import clsConfig as cf

# %matplotlib inline -- for Jupyter Notebook
class clsLinearRegression:
    def __init__(self):
        self.fileName =  cf.config['FILE_NAME']

    def predictResult(self):
        try:

            inputFileName = self.fileName

            # Reading from Input File
            df = p.read_csv(inputFileName)

            print()
            print('Projecting sample rows: ')
            print(df.head())

            print()
            x_row = df.shape[0]
            x_col = df.shape[1]

            print('Total Number of Rows: ', x_row)
            print('Total Number of columns: ', x_col)

            # Adding Features
            x = df[['TimeOnApp', 'TimeOnWebsite', 'FlipkartMembershipInYear']]

            # Target Variable - Trying to predict
            y = df['YearlyAmountSpent']

            # Now Train-Test Split of your source data
            from sklearn.model_selection import train_test_split

            # test_size => % of allocated data for your test cases
            # random_state => A specific set of random split on your data
            X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.4, random_state=101)

            # Importing Model
            from sklearn.linear_model import LinearRegression

            # Creating an Instance
            lm = LinearRegression()

            # Train or Fit my model on Training Data
            lm.fit(X_train, Y_train)

            # Creating a prediction value
            flipKartSalePrediction = lm.predict(X_test)

            # Creating a scatter plot based on Actual Value & Predicted Value
            plt.scatter(Y_test, flipKartSalePrediction)

            # Adding meaningful Label
            plt.xlabel('Actual Values')
            plt.ylabel('Predicted Values')

            # Checking Individual Metrics
            from sklearn import metrics

            print()
            mea_val = metrics.mean_absolute_error(Y_test, flipKartSalePrediction)
            print('Mean Absolute Error (MEA): ', mea_val)

            mse_val = metrics.mean_squared_error(Y_test, flipKartSalePrediction)
            print('Mean Square Error (MSE): ', mse_val)

            rmse_val = np.sqrt(metrics.mean_squared_error(Y_test, flipKartSalePrediction))
            print('Square root Mean Square Error (RMSE): ', rmse_val)

            print()

            # Check Variance Score - R^2 Value
            print('Variance Score:')
            var_score = str(round(metrics.explained_variance_score(Y_test, flipKartSalePrediction) * 100, 2)).strip()
            print('Our Model is', var_score, '% accurate. ')
            print()

            # Finding Coeficent on X_train.columns
            print()
            print('Finding Coeficent: ')

            cedf = p.DataFrame(lm.coef_, x.columns, columns=['Coefficient'])
            print('Printing the All the Factors: ')
            print(cedf)

            print()

            # Getting the Max Value from it
            cedf['MaxFactorForBusiness'] = cedf['Coefficient'].max()

            # Filtering the max Value to identify the biggest Business factor
            dfMax = cedf[(cedf['MaxFactorForBusiness'] == cedf['Coefficient'])]

            # Dropping the derived column
            dfMax.drop(columns=['MaxFactorForBusiness'], inplace=True)
            dfMax = dfMax.reset_index()

            print(dfMax)

            # Extracting Actual Business Factor from Pandas dataframe
            str_factor_temp = str(dfMax.iloc[0]['index'])
            str_factor = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", str_factor_temp)
            str_value = str(round(float(dfMax.iloc[0]['Coefficient']),2))

            print()
            print('*' * 80)
            print('Major Busienss Activity - (', str_factor, ') - ', str_value, '%')
            print('*' * 80)
            print()

            # This is require when you are trying to print from conventional
            # front & not using Jupyter notebook.
            plt.show()

            return 0

        except Exception  as e:
            x = str(e)
            print('Error : ', x)

            return 1

Key lines from the above snippet –

# Adding Features
x = df[['TimeOnApp', 'TimeOnWebsite', 'FlipkartMembershipInYear']]

Our application creating a subset of the main datagram, which contains all the features.

# Target Variable - Trying to predict
y = df['YearlyAmountSpent']

Now, the application is setting the target variable into ‘Y.’

# Now Train-Test Split of your source data
from sklearn.model_selection import train_test_split

# test_size => % of allocated data for your test cases
# random_state => A specific set of random split on your data
X_train, X_test, Y_train, Y_test = train_test_split(x, y, test_size=0.4, random_state=101)

As per “Supervised Learning,” our application is splitting the dataset into two subsets. One is to train the model & another segment is to test your final model. However, you can divide the data into three sets that include the performance statistics for a large dataset. In our case, we don’t need that as this data is significantly less.

# Train or Fit my model on Training Data
lm.fit(X_train, Y_train)

Our application is now training/fit the data into the model.

# Creating a scatter plot based on Actual Value & Predicted Value
plt.scatter(Y_test, flipKartSalePrediction)

Our application projected the outcome based on the predicted data in a scatterplot graph.

Also, the following concepts captured by using our program. For more details, I’ve provided the external link for your reference –

  1. Mean Absolute Error (MEA)
  2. Mean Square Error (MSE)
  3. Square Root Mean Square Error (RMSE)

And, the implementation has shown as –

mea_val = metrics.mean_absolute_error(Y_test, flipKartSalePrediction)
print('Mean Absolute Error (MEA): ', mea_val)

mse_val = metrics.mean_squared_error(Y_test, flipKartSalePrediction)
print('Mean Square Error (MSE): ', mse_val)

rmse_val = np.sqrt(metrics.mean_squared_error(Y_test, flipKartSalePrediction))
print('Square Root Mean Square Error (RMSE): ', rmse_val)

At this moment, we would like to check the credibility of our model by using the variance score are as follows –

var_score = str(round(metrics.explained_variance_score(Y_test, flipKartSalePrediction) * 100, 2)).strip()
print('Our Model is', var_score, '% accurate. ')

Finally, extracting the coefficient to find out, which particular feature will lead Flikkart for better sale & growth by taking the maximum of coefficient value month the all features are as shown below –

cedf = p.DataFrame(lm.coef_, x.columns, columns=['Coefficient'])

# Getting the Max Value from it
cedf['MaxFactorForBusiness'] = cedf['Coefficient'].max()

# Filtering the max Value to identify the biggest Business factor
dfMax = cedf[(cedf['MaxFactorForBusiness'] == cedf['Coefficient'])]

# Dropping the derived column
dfMax.drop(columns=['MaxFactorForBusiness'], inplace=True)
dfMax = dfMax.reset_index()

Note that we’ve used a regular expression to split the camel-case column name from our feature & represent that with a much more meaningful name without changing the column name.

# Extracting Actual Business Factor from Pandas dataframe
str_factor_temp = str(dfMax.iloc[0]['index'])
str_factor = re.sub("([a-z])([A-Z])", "\g<1> \g<2>", str_factor_temp)
str_value = str(round(float(dfMax.iloc[0]['Coefficient']),2))

print('Major Busienss Activity - (', str_factor, ') - ', str_value, '%')

3. callLinear.py (This is the first calling script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 15-May-2020              ####
#### Modified On 15-May-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsLinearRegression as cw

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_1 = 0
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'MachineLearning_LinearRegression.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Machine Learning - Linear Regression Prediction : ')
        print('-' * 200)

        # Create the instance of the Linear-Regression Class
        x2 = cw.clsLinearRegression()

        ret = x2.predictResult()

        if ret == 0:
            print('Successful Linear-Regression Prediction Generated!')
        else:
            print('Failed to generate Linear-Regression Prediction!')

        print("-" * 200)
        print()

        print('Finding Analysis points..')
        print("*" * 200)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        logging.info(str(e))

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Create the instance of the Linear-Regression
x2 = cw.clsLinearRegression()

ret = x2.predictResult()

In the above snippet, our application initially creating an instance of the main class & finally invokes the “predictResult” method.

Let’s run our application –

Step 1:

First, the application will fetch the following sample rows from our source file – if it is successful.

2. Run_1

Step 2:

Then, It will create the following scatterplot by executing the following snippet –

# Creating a scatter plot based on Actual Value & Predicted Value
plt.scatter(Y_test, flipKartSalePrediction)
3. Run_2

Note that our model is pretty accurate & it has a balanced success rate compared to our predicted numbers.

Step 3:

Finally, it is successfully able to project the critical feature are shown below –

4. Run_3

From the above picture, you can see that our model is pretty accurate (89% approx).

Also, highlighted red square identifying the key-features & their confidence score & finally, the projecting the winner feature marked in green.

So, as per that, we’ve come to one conclusion that Flipkart’s business growth depends on the tenure of their subscriber, i.e., old members are prone to buy more than newer members.

Let’s look into our directory structure –

5. Win_Dir

So, we’ve done it.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Analyzing Language using IBM Watson using Python

Hi Guys,

Today, I’ll be discussing the following topic – “How to analyze text using IBM Watson implementing through Python.”

IBM has significantly improved in the field of Visual Image Analysis or Text language analysis using its IBM Watson cloud platform. In this particular topic, we’ll be exploring the natural languages only.

To access IBM API, we need to first create an IBM Cloud account from this site.

Let us quickly go through the steps to create the IBM Language Understanding service. Click the Catalog on top of your browser menu as shown in the below picture –

6. Creating an Instance for Watson

After that, click the AI option on your left-hand side of the panel marked in RED.

Click the Watson-Studio & later choose the plan. In our case, We’ll select the “Lite” option as IBM provided this platform for all the developers to explore their cloud for free.

7. Choosing AI
8. Choosing Plan

Clicking the create option will lead to a blank page of Watson Studio as shown below –

9. Choosing Watson Studio

And, now, we need to click the Get Started button to launch it. This will lead to Create Project page, which can be done using the following steps –

10. Create Project Initial Screen

Now, clicking the create a project will lead you to the next screen –

11. Create Project - Continue

You can choose either an empty project, or you can create it from a sample file. In this case, we’ll be selecting the first option & this will lead us to the below page –

12. Creating a Project

And, then you will click the “Create” option, which will lead you to the next screen –

13. Adding to project

Now, you need to click “Add to Project.” This will give you a variety of services that you want to explore/use from the list. If you want to create your own natural language classifier, which you can do that as follows –

14. Adding Natural Language Components from IBM Cloud

Once, you click it – you need to select the associate service –

15. Adding Associte Service - Sound

Here, you need to click the hyperlink, which prompts to the next screen –

16. Choosing Associate Service - Sound

You need to check the price for both the Visual & Natural Language Classifier. They are pretty expensive. The visual classifier has the Lite plan. However, it has limitations of output.

Clicking the “Create” will prompt to the next screen –

18. Selecting Region - Sound

After successful creation, you will be redirected to the following page –

19. Landing Page - Sound

Now, We’ll be adding our “Natural Language Understand” for our test –

29. Choosing Natural Language Understanding

This will prompt the next screen –

7. Choosing AI - Natural Language Understanding

Once, it is successful. You will see the service registered as shown below –

3. Watson Services - Sound

If you click the service marked in RED, it will lead you to another page, where you will get the API Key & Url. You need both of this information in Python application to access this API as shown below –

4. Watson API Details - Sound

Now, we’re ready with the necessary cloud set-up. After this, we need to install the Python package for IBM Cloud as shown below –

1. Installing_Packages

We’ve noticed that, recently, IBM has launched one upgraded package. Hence, we installed that one as well. I would recommend you to install this second package directly instead of the first one shown above –

2. Installing Latest IBM_Watson Package

Now, we’re done with our set-up.

Let’s see the directory structure –

31. Directory Structure

We’ll be discussing only the main calling script & class script. However, we’ll be posting the parameters without discussing it. And, we won’t discuss clsL.py as we’ve already discussed that in our previous post.

1. clsConfig.py (This script contains all the parameter details.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### IBM Cloud API.   Application will    ####
#### process these information & perform  ####
#### various analysis on IBM Watson cloud.####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'SERVICE_URL': "https://api.eu-gb.natural-language-understanding.watson.cloud.ibm.com/instances/xxxxxxxxxxxxxxXXXXXXXXXXxxxxxxxxxxxxxxxx",
        'API_KEY': "Xxxxxxxxxxxxxkdkdfifd984djddkkdkdkdsSSdkdkdd",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'IBM Watson Language Understand!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

Note that you will be placing your API_KEY & URL here, as shown in the configuration file.

2. clsIBMWatson.py (This is the main script, which will invoke the IBM Watson API based on the input from the user & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### IBM Watson Language Understand API.  ####
##############################################

import logging
from clsConfig import clsConfig as cf
import clsL as cl
import json
from ibm_watson import NaturalLanguageUnderstandingV1
from ibm_cloud_sdk_core.authenticators import IAMAuthenticator
from ibm_watson.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, SentimentOptions, CategoriesOptions, ConceptsOptions
from ibm_watson import ApiException

class clsIBMWatson:
    def __init__(self):
        self.api_key =  cf.config['API_KEY']
        self.service_url = cf.config['SERVICE_URL']

    def calculateExpressionFromUrl(self, inputUrl, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                url=inputUrl,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

    def calculateExpressionFromText(self, inputText, inputVersion):
        try:
            api_key = self.api_key
            service_url = self.service_url
            print('-' * 60)
            print('Beginning of the IBM Watson for Input Url.')
            print('-' * 60)

            authenticator = IAMAuthenticator(api_key)

            # Authentication via service credentials provided in our config files
            service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
            service.set_service_url(service_url)

            response = service.analyze(
                text=inputText,
                features=Features(entities=EntitiesOptions(),
                                  sentiment=SentimentOptions(),
                                  concepts=ConceptsOptions())).get_result()

            print(json.dumps(response, indent=2))

            return 0

        except ApiException as ex:
            print('-' * 60)
            print("Method failed for Url with status code " + str(ex.code) + ": " + ex.message)
            print('-' * 60)

            return 1

Some of the key lines from the above snippet –

authenticator = IAMAuthenticator(api_key)

# Authentication via service credentials provided in our config files
service = NaturalLanguageUnderstandingV1(version=inputVersion, authenticator=authenticator)
service.set_service_url(service_url)

By providing the API Key & Url, the application is initiating the service for Watson.

response = service.analyze(
    url=inputUrl,
    features=Features(entities=EntitiesOptions(),
                      sentiment=SentimentOptions(),
                      concepts=ConceptsOptions())).get_result()

Based on your type of input, it will bring the features of entities, sentiment & concepts here. Apart from that, you can additionally check the following features as well – Keywords & Categories.

3. callIBMWatsonAPI.py (This is the first calling script. Based on user choice, it will receive input either as Url or as the plain text & then analyze it.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
#### Modified On 04-Apr-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsIBMWatson as cw

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        ret_1 = 0
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'IBMWatson_NaturalLanguageAnalysis.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to IBM Wantson Language Understanding Calling Program: ')
        print('-' * 60)
        print('Please Press 1 for Understand the language from Url.')
        print('Please Press 2 for Understand the language from your input-text.')
        input_choice = int(input('Please provide your choice:'))

        # Create the instance of the IBM Watson Class
        x2 = cw.clsIBMWatson()

        # Let's pass this to our map section
        if input_choice == 1:
            textUrl = str(input('Please provide the complete input url:'))
            ret_1 = x2.calculateExpressionFromUrl(textUrl, curr_ver)
        elif input_choice == 2:
            inputText = str(input('Please provide the input text:'))
            ret_1 = x2.calculateExpressionFromText(inputText, curr_ver)
        else:
            print('Invalid options!')

        if ret_1 == 0:
            print('Successful IBM Watson Language Understanding Generated!')
        else:
            print('Failed to generate IBM Watson Language Understanding!')

        print("-" * 60)
        print()

        print('Finding Analysis points..')
        print("*" * 157)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

This script is pretty straight forward as it is first creating an instance of the main class & then based on the user input, it is calling the respective functions here.

As of now, IBM Watson can work on a list of languages, which are available here.

If you want to start from scratch, please refer to the following link.

Please find the screenshot of our application run –

Case 1 (With Url): 

21. Win_Run_1_Url
23. Win_Run_3_Url

Case 2 (With Plain text):

25. Win_Run_1_InputText
26. Win_Run_2_InputText
27. Win_Run_3_InputText

Now, Don’t forget to delete all the services from your IBM Cloud.

32. Delete Service

As you can see, from the service, you need to delete all the services one-by-one as shown in the figure.

So, we’ve done it.

To explore my photography, you can visit the following link.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.