Live visual reading using Convolutional Neural Network (CNN) through Python-based machine-learning application.

This week we’re planning to touch on one of the exciting posts of visually reading characters from WebCAM & predict the letters using CNN methods. Before we dig deep, why don’t we see the demo run first?

Demo

Isn’t it fascinating? As we can see, the computer can record events and read like humans. And, thanks to the brilliant packages available in Python, which can help us predict the correct letter out of an Image.


What do we need to test it out?

  1. Preferably an external WebCAM.
  2. A moderate or good Laptop to test out this.
  3. Python 
  4. And a few other packages that we’ll mention next block.

What Python packages do we need?

Some of the critical packages that we must need to test out this application are –

cmake==3.22.1
dlib==19.19.0
face-recognition==1.3.0
face-recognition-models==0.3.0
imutils==0.5.3
jsonschema==4.4.0
keras==2.7.0
Keras-Preprocessing==1.1.2
matplotlib==3.5.1
matplotlib-inline==0.1.3
oauthlib==3.1.1
opencv-contrib-python==4.1.2.30
opencv-contrib-python-headless==4.4.0.46
opencv-python==4.5.5.62
opencv-python-headless==4.5.5.62
pickleshare==0.7.5
Pillow==9.0.0
python-dateutil==2.8.2
requests==2.27.1
requests-oauthlib==1.3.0
scikit-image==0.19.1
scikit-learn==1.0.2
tensorboard==2.7.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.7.0
tensorflow-estimator==2.7.0
tensorflow-io-gcs-filesystem==0.23.1
tqdm==4.62.3

What is CNN?

In deep learning, a convolutional neural network (CNN/ConvNet) is a class of deep neural networks most commonly applied to analyze visual imagery.

Different Steps of CNN

We can understand from the above picture that a CNN generally takes an image as input. The neural network analyzes each pixel separately. The weights and biases of the model are then tweaked to detect the desired letters (In our use case) from the image. Like other algorithms, the data also has to pass through pre-processing stage. However, a CNN needs relatively less pre-processing than most other Deep Learning algorithms.

If you want to know more about this, there is an excellent article on CNN with some on-point animations explaining this concept. Please read it here.

Where do we get the data sets for our testing?

For testing, we are fortunate enough to have Kaggle with us. We have received a wide variety of sample data, which you can get from here.


Our use-case:

Architecture

From the above diagram, one can see that the python application will consume a live video feed of any random letters (both printed & handwritten) & predict the character as part of the machine learning model that we trained.


Code:

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'A_Z_Handwritten_Data.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize'😦3, 3),
'poolSize'😦2, 2),
'filterVal1':32,
'filterVal2':64,
'filterVal3':128,
'stridesVal':2,
'monitorVal':'val_loss',
'paddingVal1':'same',
'paddingVal2':'valid',
'reshapeVal':28,
'reshapeVal1'😦28,28),
'patienceVal1':1,
'patienceVal2':2,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'minDeltaVal':0,
'minLrVal':0.0001,
'verboseFlag':0,
'modeInd':'auto',
'shuffleVal':100,
'DenkseVal1':26,
'DenkseVal2':64,
'DenkseVal3':128,
'predParam':9,
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},
'width':640,
'height':480,
'imgSize': (32,32),
'threshold': 0.45,
'imgDimension': (400, 440),
'imgSmallDim': (7, 7),
'imgMidDim': (28, 28),
'reshapeParam1':1,
'reshapeParam2':28,
'colorFeed'😦0,0,130),
'colorPredict'😦0,25,255)
}

view raw

clsConfig.py

hosted with ❤ by GitHub

Important parameters that we need to follow from the above snippets are –

'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize':(3, 3),
'poolSize':(2, 2),
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},

Since we have 26 letters, we have classified it as 26 in the numOfClasses.

Since we are talking about characters, we had to come up with a process of identifying each character as numbers & then processing our entire logic. Hence, the above parameter named word_dict captured all the characters in a python dictionary & stored them. Moreover, the application translates the final number output to more appropriate characters as the prediction.

2. clsAlphabetReading.py (Main training class to teach the model to predict alphabets from visual reader.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
from keras.datasets import mnist
import matplotlib.pyplot as plt
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout
from tensorflow.keras.optimizers import SGD, Adam
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.utils.np_utils import to_categorical
import pandas as p
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook
from sklearn.utils import shuffle
import pickle
import os
import platform as pl
from clsConfig import clsConfig as cf
class clsAlphabetReading:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.fileName = str(cf.conf['FILE_NAME'])
self.testRatio = float(cf.conf['testRatio'])
self.valRatio = float(cf.conf['valRatio'])
self.epochsVal = int(cf.conf['epochsVal'])
self.activationType = str(cf.conf['activationType'])
self.activationType2 = str(cf.conf['activationType2'])
self.numOfClasses = int(cf.conf['numOfClasses'])
self.kernelSize = cf.conf['kernelSize']
self.poolSize = cf.conf['poolSize']
self.filterVal1 = int(cf.conf['filterVal1'])
self.filterVal2 = int(cf.conf['filterVal2'])
self.filterVal3 = int(cf.conf['filterVal3'])
self.stridesVal = int(cf.conf['stridesVal'])
self.monitorVal = str(cf.conf['monitorVal'])
self.paddingVal1 = str(cf.conf['paddingVal1'])
self.paddingVal2 = str(cf.conf['paddingVal2'])
self.reshapeVal = int(cf.conf['reshapeVal'])
self.reshapeVal1 = cf.conf['reshapeVal1']
self.patienceVal1 = int(cf.conf['patienceVal1'])
self.patienceVal2 = int(cf.conf['patienceVal2'])
self.sleepTime = int(cf.conf['sleepTime'])
self.sleepTime1 = int(cf.conf['sleepTime1'])
self.factorVal = float(cf.conf['factorVal'])
self.learningRateVal = float(cf.conf['learningRateVal'])
self.minDeltaVal = int(cf.conf['minDeltaVal'])
self.minLrVal = float(cf.conf['minLrVal'])
self.verboseFlag = int(cf.conf['verboseFlag'])
self.modeInd = str(cf.conf['modeInd'])
self.shuffleVal = int(cf.conf['shuffleVal'])
self.DenkseVal1 = int(cf.conf['DenkseVal1'])
self.DenkseVal2 = int(cf.conf['DenkseVal2'])
self.DenkseVal3 = int(cf.conf['DenkseVal3'])
self.predParam = int(cf.conf['predParam'])
self.word_dict = cf.conf['word_dict']
def applyCNN(self, X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg):
try:
testRatio = self.testRatio
epochsVal = self.epochsVal
activationType = self.activationType
activationType2 = self.activationType2
numOfClasses = self.numOfClasses
kernelSize = self.kernelSize
poolSize = self.poolSize
filterVal1 = self.filterVal1
filterVal2 = self.filterVal2
filterVal3 = self.filterVal3
stridesVal = self.stridesVal
monitorVal = self.monitorVal
paddingVal1 = self.paddingVal1
paddingVal2 = self.paddingVal2
reshapeVal = self.reshapeVal
patienceVal1 = self.patienceVal1
patienceVal2 = self.patienceVal2
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
factorVal = self.factorVal
learningRateVal = self.learningRateVal
minDeltaVal = self.minDeltaVal
minLrVal = self.minLrVal
verboseFlag = self.verboseFlag
modeInd = self.modeInd
shuffleVal = self.shuffleVal
DenkseVal1 = self.DenkseVal1
DenkseVal2 = self.DenkseVal2
DenkseVal3 = self.DenkseVal3
model = Sequential()
model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Flatten())
model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))
model.add(Dense(DenkseVal1,activation = activationType2))
model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)
fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop], validation_data = (X_Validation,Y_Validation_Catg))
return (model, fittedModel)
except Exception as e:
x = str(e)
model = Sequential()
print('Error: ', x)
return (model, model)
def trainModel(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
fileName = self.fileName
epochsVal = self.epochsVal
valRatio = self.valRatio
predParam = self.predParam
testRatio = self.testRatio
reshapeVal = self.reshapeVal
numOfClasses = self.numOfClasses
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
shuffleVal = self.shuffleVal
reshapeVal1 = self.reshapeVal1
# Dictionary for getting characters from index values
word_dict = self.word_dict
print('File Name: ', str(fileName))
# Read the data
df_HW_Alphabet = p.read_csv(fileName).astype('float32')
# Sample Data
print('Sample Data: ')
print(df_HW_Alphabet.head())
# Split data the (x – Our data) & (y – the prdict label)
x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']
# Reshaping the data in csv file to display as an image
X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)
X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))
print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)
# Plotting the number of alphabets in the dataset
Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
count[i] +=1
alphabets = []
for i in word_dict.values():
alphabets.append(i)
fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)
plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()
# Shuffling the data
shuff = shuffle(X_Train[:shuffleVal])
# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)
X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)
X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)
# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)
Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)
Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)
model, history = self.applyCNN(X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg)
print('Model Summary: ')
print(model.summary())
# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])
# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Making the model to predict
pred = model.predict(X_Test[:predParam])
print('Test Details::')
print('X_Test: ', X_Test.shape)
print('Y_Test_Catg: ', Y_Test_Catg.shape)
try:
score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
print('Test Score = ', score[0])
print('Test Accuracy = ', score[1])
except Exception as e:
x = str(e)
print('Error: ', x)
# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()
for i in range(9):
axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
pred = word_dict[np.argmax(Y_Test_Catg[i])]
print('Prediction: ', pred)
axes[i].set_title("Test Prediction: " + pred)
axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets from the above scripts are –

x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']

In the above snippet, we have split the data into images & their corresponding labels.

X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)

X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))


print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)

We are splitting the data into Train, Test & Validation sets to get more accurate predictions and reshaping the raw data into the image by consuming the 784 data columns to 28×28 pixel images.

Since we are talking about characters, we had to come up with a process of identifying The following snippet will plot the character equivalent number into a matplotlib chart & showcase the overall distribution trend after splitting.

Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
    count[i] +=1

alphabets = []
for i in word_dict.values():
    alphabets.append(i)

fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)

plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()

Note that we have tweaked the plt.show property with (block=False). This property will enable us to continue execution without human interventions after the initial pause.

# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)

X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)

X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)

# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)

Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)

Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)

In the above diagram, the application did reshape all three categories of data before calling the primary CNN function.

model = Sequential()

model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Flatten())

model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))

model.add(Dense(DenkseVal1,activation = activationType2))

model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)


fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop],  validation_data = (X_Validation,Y_Validation_Catg))

return (model, fittedModel)

In the above snippet, the convolution layers are followed by maxpool layers, which reduce the number of features extracted. The output of the maxpool layers and convolution layers are flattened into a vector of a single dimension and supplied as an input to the Dense layer—the CNN model prepared for training the model using the training dataset.

We have used optimization parameters like Adam, RMSProp & the application we trained for eight epochs for better accuracy & predictions.

# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])

# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Also, we have captured the validation Accuracy & Loss & plot them into two separate graphs for better understanding.

try:
    score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
    print('Test Score = ', score[0])
    print('Test Accuracy = ', score[1])
except Exception as e:
    x = str(e)
    print('Error: ', x)

Also, the application is trying to get the accuracy of the model that we trained & validated with the training & validation data. This time we have used test data to predict the confidence score.

# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()

for i in range(9):
    axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
    pred = word_dict[np.argmax(Y_Test_Catg[i])]
    print('Prediction: ', pred)
    axes[i].set_title("Test Prediction: " + pred)
    axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Finally, the application testing with some random test data & tried to plot the output & prediction assessment.

Testing with Random Test Data
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()

As a part of the last step, the application will generate the models using a pickle package & save them under a specific location, which the reader application will use.

3. trainingVisualDataRead.py (Main application that will invoke the training class to predict alphabet through WebCam using Convolutional Neural Network (CNN).)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsAlhpabetReading class to initiate ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
# We keep the setup code in a different class as shown below.
import clsAlphabetReading as ar
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = ar.clsAlphabetReading()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.trainModel(debugInd, var)
if (r1 == 0):
print('Successfully Visual Alphabet Training Completed!')
else:
print('Failed to complete the Visual Alphabet Training!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the core snippet from the above script is –

x1 = ar.clsAlphabetReading()

Instantiate the main class.

r1 = x1.trainModel(debugInd, var)

The python application will invoke the class & capture the returned value inside the r1 variable.

4. readingVisualData.py (Reading the model to predict Alphabet using WebCAM.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 18-Jan-2022 ####
#### Modified On 18-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### scan the live video feed from the ####
#### web-cam & predict the alphabet that ####
#### read it. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import cv2
import pickle
import numpy as np
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
epochsVal = int(cf.conf['epochsVal'])
numOfClasses = int(cf.conf['numOfClasses'])
word_dict = cf.conf['word_dict']
width = int(cf.conf['width'])
height = int(cf.conf['height'])
imgSize = cf.conf['imgSize']
threshold = float(cf.conf['threshold'])
imgDimension = cf.conf['imgDimension']
imgSmallDim = cf.conf['imgSmallDim']
imgMidDim = cf.conf['imgMidDim']
reshapeParam1 = int(cf.conf['reshapeParam1'])
reshapeParam2 = int(cf.conf['reshapeParam2'])
colorFeed = cf.conf['colorFeed']
colorPredict = cf.conf['colorPredict']
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Live Streaming!')
cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)
while True:
status, img = cap.read()
if status == False:
break
img_copy = img.copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)
img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)
img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))
img_pred = word_dict[np.argmax(model.predict(img_final))]
# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)
cv2.putText(img, "Live Feed : (" + str(probVal) + "%) ", (20,25), cv2.FONT_HERSHEY_TRIPLEX, 0.7, color = colorFeed)
cv2.putText(img, "Prediction: " + img_pred, (20,410), cv2.FONT_HERSHEY_DUPLEX, 1.3, color = colorPredict)
cv2.imshow("Original Image", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
r1=0
break
if (r1 == 0):
print('Successfully Alphabets predicted!')
else:
print('Failed to predict alphabet!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the key snippet from the above code is –

cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)

The application is reading the live video data from WebCAM. Also, set out the height & width for the video output.

fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)

The application reads the model output generated as part of the previous script using the pickle package.

while True:
    status, img = cap.read()

    if status == False:
        break

The application will read the WebCAM & it exits if there is an end of video transmission or some kind of corrupt video frame.

img_copy = img.copy()

img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)

img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)

img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))


img_pred = word_dict[np.argmax(model.predict(img_final))]

We have initially cloned the original video frame & then it converted from BGR2GRAYSCALE while applying the threshold on it doe better prediction outcomes. Then the image has resized & reshaped for model input. Finally, the np.argmax function extracted the class index with the highest predicted probability. Furthermore, it is translated using the word_dict dictionary to an Alphabet & displayed on top of the Live View.

# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)

Also, derive the confidence score of that probability & display that on top of the Live View.

if cv2.waitKey(1) & 0xFF == ord('q'):
    r1=0
    break

The above code will let the developer exit from this application by pressing the “Esc” or “q”-key from the keyboard & the program will terminate.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality of Alphabet.

Projecting real-time KPIs by ingesting streaming events from emulated IoT-device

Today, I am planning to demonstrate an IoT use case implemented in Python. I was waiting for my Raspberry Pi to arrive. However, the product that I received was not working as expected. Perhaps, some hardware malfunction. Hence, I was looking for a way to continue with my installment even without the hardware.

I was looking for an alternative way to use an online Raspberry Pi emulator. Recently, Microsoft has introduced integrated Raspberry Pi, which you can directly integrate with Azure IoT. However, I couldn’t find any API, which I could leverage on my Python application.

So, I explored all the possible options & finally come-up with the idea of creating my own IoT-Emulator, which can integrate with any application. With the help from the online materials, I have customized & enhanced them as per my use case & finally come up with this clean application that will demonstrate this use case with clarity.

We’ll showcase this real-time use case, where we would try to capture the events generated by IoT in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes.


However, I would like to share the run before we dig deep into this.

Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & captures real-time events to Ably Queue, then transform those raw events into more meaningful KPIs. Let’s deep dive then.


Architecture:

Let’s explore the architecture –

Fig – 1

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, Dashboard consumes the events & transforms them into more meaningful metrics.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installation

Step – 2:

Installation – Continue

And, here is the command to install those packages –

pip install dash==1.0.0
pip install numpy==1.16.4
pip install pandas==0.24.2
pip install scipy==1.3.0
pip install gunicorn==19.9.0
pip install ably==1.1.1
pip install tkgpio==0.1

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py (This native Python script contains the configuration entries.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 25-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'JSONFileNameWithPath': Curr_Path + sep + 'GUI_Config' + sep + 'CircuitConfiguration.json',
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e',
"URL":"https://corona-api.com/countries/",
"appType":"application/json",
"conType":"keep-alive",
"limRec": 50,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID, FinData & JSONFileNameWithPath.

2. clsPublishStream.py (This script will publish real-time streaming data coming out from a hosted API sources using another popular third-party service named Ably. Ably mimics pubsub Streaming concept, which might be extremely useful for any start-ups.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.msgSize = cf.conf['limRec']
def pushEvents(self, srcJSON, debugInd, varVa):
try:
msgSize = self.msgSize
# Capturing the inbound dataframe
jdata_fin = json.dumps(srcJSON)
print('IOT Events: ')
print(str(jdata_fin))
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’re not going to discuss this as we’ve already discussed in my previous post.

3. clsStreamConsume.py (Consuming Streaming data from Ably channels.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### playIOTDevice.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
#jdata = json.dumps(json_data)
# Converting String to Dictionary
dict_json = eval(json_data)
# Converting JSON to Dataframe
#df = p.json_normalize(json_data)
#df.columns = df.columns.map(lambda x: x.split(".")[-1])
df = p.DataFrame.from_dict(dict_json, orient='index')
#print('DF Inside:')
#print(df)
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print('Error: ', x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’re not going to discuss this as we’ve already discussed in my previous post.

4. CircuitConfiguration.json (Configuration file for GUI Interface for IoT Simulator.)


{
"name":"Analog Device",
"width":700,
"height":350,
"leds":[
{
"x":105,
"y":80,
"name":"LED",
"pin":21
}
],
"motors":[
{
"x":316,
"y":80,
"name":"DC Motor",
"forward_pin":22,
"backward_pin":23
}
],
"servos":[
{
"x":537,
"y":80,
"name":"Servo Motor",
"pin":24,
"min_angle":-180,
"max_angle":180,
"initial_angle":20
}
],
"adc":{
"mcp_chip":3008,
"potenciometers":[
{
"x":40,
"y":200,
"name":"Brightness Potentiometer",
"channel":0
},
{
"x":270,
"y":200,
"name":"Speed Potentiometer",
"channel":2
},
{
"x":500,
"y":200,
"name":"Angle Potentiometer",
"channel":6
}
]
},
"toggles":[
{
"x":270,
"y":270,
"name":"Direction Toggle Switch",
"pin":15,
"off_label":"backward",
"on_label":"forward",
"is_on":false
}
],
"labels":[
{
"x":15,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":56,
"y":26,
"text":"Brightness Control"
},
{
"x":245,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":298,
"y":26,
"text":"Speed Control"
},
{
"x":475,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":531,
"y":26,
"text":"Angle Control"
}
]
}

This json configuration will be used by the next python class.

5. clsBuildCircuit.py (Calling Tk Circuit API.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Calling Tk Circuit API ####
##############################################
from tkgpio import TkCircuit
from json import load
from clsConfig import clsConfig as cf
fileName = str(cf.conf['JSONFileNameWithPath'])
print('File Name: ', str(fileName))
# initialize the circuit inside the GUI
with open(fileName, "r") as file:
config = load(file)
class clsBuildCircuit:
def __init__(self):
self.config = config
def genCir(self, main_function):
try:
config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)
return circuit
except Exception as e:
x = str(e)
print(x)
return ''

Key snippets from the above script –

config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)

The above lines will create an instance of simulated IoT circuits & then it will use the json file to start the GUI class.

6. playIOTDevice.py (Main Circuit GUI script to create an IoT Device to generate the events, which will consumed.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Main Tk Circuit GUI script ####
#### to create an IOT Device to generate ####
#### the events, which will consumed. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsBuildCircuit as csb
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Create the instance of the Tk Circuit API Class.
circuit = csb.clsBuildCircuit()
###############################################
### End of Global Section ###
###############################################
# Invoking the IOT Device Generator.
@circuit.genCir
def main():
from gpiozero import PWMLED, Motor, Servo, MCP3008, Button
from time import sleep
# Circuit Components
ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)
ioMeter1 = MCP3008(0)
ioMeter2 = MCP3008(2)
ioMeter3 = MCP3008(6)
switch = Button(15)
# End of circuit components
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IOTDevice.log', level=logging.INFO)
while True:
ledAlert.value = ioMeter1.value
if switch.is_pressed:
dcMotor.forward(ioMeter2.value)
xVal = 'Motor Forward'
else:
dcMotor.backward(ioMeter2.value)
xVal = 'Motor Backward'
servoMotor.value = 1 2 * ioMeter3.value
srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IOT event pushed!')
else:
print('Failed to push IOT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
sleep(0.05)

Lets’ explore the key snippets –

ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)

It defines three motors that include Servo, DC & LED.

Now, we can see the following sets of the critical snippet –

ledAlert.value = ioMeter1.value

if switch.is_pressed:
    dcMotor.forward(ioMeter2.value)
    xVal = 'Motor Forward'
else:
    dcMotor.backward(ioMeter2.value)
    xVal = 'Motor Backward'

servoMotor.value = 1 - 2 * ioMeter3.value

srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}

Following lines will dynamically generates JSON that will be passed into the Ably queue –

tmpJson = str(srcJson)

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

Final line from the above script –

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

This code will now push the events into the Ably Queue.

7. app.py (Consuming Streaming data from Ably channels & captured IOT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 02-Oct-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IOT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import os
import pathlib
import numpy as np
import datetime as dt
import dash
from dash import dcc
from dash import html
import datetime
import dash_daq as daq
from dash.exceptions import PreventUpdate
from dash.dependencies import Input, Output, State
from scipy.stats import rayleigh
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
GRAPH_INTERVAL = os.environ.get("GRAPH_INTERVAL", 5000)
app = dash.Dash(
__name__,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}],
)
app.title = "IOT Device Dashboard"
server = app.server
app_color = {"graph_bg": "#082255", "graph_line": "#007ACE"}
app.layout = html.Div(
[
# header
html.Div(
[
html.Div(
[
html.H4("IOT DEVICE STREAMING", className="app__header__title"),
html.P(
"This app continually consumes streaming data from IOT-Device and displays live charts of various metrics & KPI associated with it.",
className="app__header__title–grey",
),
],
className="app__header__desc",
),
html.Div(
[
html.A(
html.Button("SOURCE CODE", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream",
),
html.A(
html.Button("VIEW DEMO", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream/blob/main/demo.gif",
),
html.A(
html.Img(
src=app.get_asset_url("dash-new-logo.png"),
className="app__menu__img",
),
href="https://plotly.com/dash/",
),
],
className="app__header__logo",
),
],
className="app__header",
),
html.Div(
[
# Motor Speed
html.Div(
[
html.Div(
[html.H6("SERVO METER (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
),
# Second Panel
html.Div(
[html.H6("DC-MOTOR (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure-1",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update-1",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
)
],
className="two-thirds column motor__speed__container",
),
html.Div(
[
# histogram
html.Div(
[
html.Div(
[
html.H6(
"MOTOR POWER HISTOGRAM",
className="graph__title",
)
]
),
html.Div(
[
dcc.Slider(
id="bin-slider",
min=1,
max=60,
step=1,
value=20,
updatemode="drag",
marks={
20: {"label": "20"},
40: {"label": "40"},
60: {"label": "60"},
},
)
],
className="slider",
),
html.Div(
[
dcc.Checklist(
id="bin-auto",
options=[
{"label": "Auto", "value": "Auto"}
],
value=["Auto"],
inputClassName="auto__checkbox",
labelClassName="auto__label",
),
html.P(
"# of Bins: Auto",
id="bin-size",
className="auto__p",
),
],
className="auto__container",
),
dcc.Graph(
id="motor-histogram",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container first",
),
# motor direction
html.Div(
[
html.Div(
[
html.H6(
"SERVO MOTOR DIRECTION", className="graph__title"
)
]
),
dcc.Graph(
id="servo-motor-direction",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container second",
),
],
className="one-third column histogram__direction",
),
],
className="app__content",
),
],
className="app__container",
)
def toPositive(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMotor']))
elif flag == 'DCMotor':
x_val = abs(float(row['DCMotor'])) * 0.001
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def toPositiveInflated(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMeter'])) * 100
elif flag == 'DCMotor':
x_val = abs(float(row['DCMeter'])) * 100
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def getData(var, Ind):
try:
# Let's pass this to our map section
df = x1.conStream(var, Ind)
df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)
# Dropping old columns
df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)
#Rename New Columns to Old Columns
df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
@app.callback(
Output("iot-measure-1", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the DC Meter graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["DCMotor"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["DCMeter"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["DCMotor"])),
max(100, max(df["DCMotor"]) + max(df["DCMeter"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["DCMotor"].iloc[1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the Motor Speed graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["ServoMeter"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["ServoMotor"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["ServoMeter"])),
max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["ServoMeter"].iloc[1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("servo-motor-direction", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_motor_direction(interval):
"""
Generate the Servo direction graph.
:params interval: update the graph based on an interval
"""
df = getData(var1, DInd)
val = df["ServoMeter"].iloc[1]
direction = [0, (df["ServoMeter"][0]*100 20), (df["ServoMeter"][0]*100 + 20), 0]
traces_scatterpolar = [
{"r": [0, val, val, 0], "fillcolor": "#084E8A"},
{"r": [0, val * 0.65, val * 0.65, 0], "fillcolor": "#B4E1FA"},
{"r": [0, val * 0.3, val * 0.3, 0], "fillcolor": "#EBF5FA"},
]
data = [
dict(
type="scatterpolar",
r=traces["r"],
theta=direction,
mode="lines",
fill="toself",
fillcolor=traces["fillcolor"],
line={"color": "rgba(32, 32, 32, .6)", "width": 1},
)
for traces in traces_scatterpolar
]
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
autosize=False,
polar={
"bgcolor": app_color["graph_line"],
"radialaxis": {"range": [0, 45], "angle": 45, "dtick": 10},
"angularaxis": {"showline": False, "tickcolor": "white"},
},
showlegend=False,
)
return dict(data=data, layout=layout)
@app.callback(
Output("motor-histogram", "figure"),
[Input("iot-measure-update", "n_intervals")],
[
State("iot-measure", "figure"),
State("bin-slider", "value"),
State("bin-auto", "value"),
],
)
def gen_motor_histogram(interval, iot_speed_figure, slider_value, auto_state):
"""
Genererate iot histogram graph.
:params interval: upadte the graph based on an interval
:params iot_speed_figure: current Motor Speed graph
:params slider_value: current slider value
:params auto_state: current auto state
"""
motor_val = []
try:
print('Inside gen_motor_histogram:')
print('iot_speed_figure::')
print(iot_speed_figure)
# Check to see whether iot-measure has been plotted yet
if iot_speed_figure is not None:
motor_val = iot_speed_figure["data"][0]["y"]
if "Auto" in auto_state:
bin_val = np.histogram(
motor_val,
bins=range(int(round(min(motor_val))), int(round(max(motor_val)))),
)
else:
bin_val = np.histogram(motor_val, bins=slider_value)
except Exception as error:
raise PreventUpdate
avg_val = float(sum(motor_val)) / len(motor_val)
median_val = np.median(motor_val)
pdf_fitted = rayleigh.pdf(
bin_val[1], loc=(avg_val) * 0.55, scale=(bin_val[1][1] bin_val[1][0]) / 3
)
y_val = (pdf_fitted * max(bin_val[0]) * 20,)
y_val_max = max(y_val[0])
bin_val_max = max(bin_val[0])
trace = dict(
type="bar",
x=bin_val[1],
y=bin_val[0],
marker={"color": app_color["graph_line"]},
showlegend=False,
hoverinfo="x+y",
)
traces_scatter = [
{"line_dash": "dash", "line_color": "#2E5266", "name": "Average"},
{"line_dash": "dot", "line_color": "#BD9391", "name": "Median"},
]
scatter_data = [
dict(
type="scatter",
x=[bin_val[int(len(bin_val) / 2)]],
y=[0],
mode="lines",
line={"dash": traces["line_dash"], "color": traces["line_color"]},
marker={"opacity": 0},
visible=True,
name=traces["name"],
)
for traces in traces_scatter
]
trace3 = dict(
type="scatter",
mode="lines",
line={"color": "#42C4F7"},
y=y_val[0],
x=bin_val[1][: len(bin_val[1])],
name="Rayleigh Fit",
)
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
xaxis={
"title": "Motor Power",
"showgrid": False,
"showline": False,
"fixedrange": True,
},
yaxis={
"showgrid": False,
"showline": False,
"zeroline": False,
"title": "Number of Samples",
"fixedrange": True,
},
autosize=True,
bargap=0.01,
bargroupgap=0,
hovermode="closest",
legend={
"orientation": "h",
"yanchor": "bottom",
"xanchor": "center",
"y": 1,
"x": 0.5,
},
shapes=[
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": avg_val,
"x1": avg_val,
"type": "line",
"line": {"dash": "dash", "color": "#2E5266", "width": 5},
},
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": median_val,
"x1": median_val,
"type": "line",
"line": {"dash": "dot", "color": "#BD9391", "width": 5},
},
],
)
return dict(data=[trace, scatter_data[0], scatter_data[1], trace3], layout=layout)
@app.callback(
Output("bin-auto", "value"),
[Input("bin-slider", "value")],
[State("iot-measure", "figure")],
)
def deselect_auto(slider_value, iot_speed_figure):
""" Toggle the auto checkbox. """
# prevent update if graph has no data
if "data" not in iot_speed_figure:
raise PreventUpdate
if not len(iot_speed_figure["data"]):
raise PreventUpdate
if iot_speed_figure is not None and len(iot_speed_figure["data"][0]["y"]) > 5:
return [""]
return ["Auto"]
@app.callback(
Output("bin-size", "children"),
[Input("bin-auto", "value")],
[State("bin-slider", "value")],
)
def show_num_bins(autoValue, slider_value):
""" Display the number of bins. """
if "Auto" in autoValue:
return "# of Bins: Auto"
return "# of Bins: " + str(int(slider_value))
if __name__ == "__main__":
app.run_server(debug=True)

view raw

app.py

hosted with ❤ by GitHub

Here are the key snippets –

html.Div(
        [
            html.Div(
                [html.H6("SERVO METER (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            ),
            # Second Panel
            html.Div(
                [html.H6("DC-MOTOR (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure-1",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update-1",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            )
        ],
        className="two-thirds column motor__speed__container",

The following line creates two panels, where the application will consume the streaming data by the app’s call-back feature & refresh the data & graphs as & when the application receives the streaming data.

A similar approach was adopted for other vital aspects/components inside the dashboard.

def getData(var, Ind):
    try:
        # Let's pass this to our map section
        df = x1.conStream(var, Ind)

        df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
        df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
        df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
        df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)

        # Dropping old columns
        df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)

        #Rename New Columns to Old Columns
        df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
        df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
        df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The application is extracting streaming data & consuming it from the Ably queue.

@app.callback(
    Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
    """
    Generate the Motor Speed graph.

    :params interval: update the graph based on an interval
    """

    # Let's pass this to our map section
    df = getData(var1, DInd)

    trace = dict(
        type="scatter",
        y=df["ServoMeter"],
        line={"color": "#42C4F7"},
        hoverinfo="skip",
        error_y={
            "type": "data",
            "array": df["ServoMotor"],
            "thickness": 1.5,
            "width": 2,
            "color": "#B4E8FC",
        },
        mode="lines",
    )

    layout = dict(
        plot_bgcolor=app_color["graph_bg"],
        paper_bgcolor=app_color["graph_bg"],
        font={"color": "#fff"},
        height=400,
        xaxis={
            "range": [0, 200],
            "showline": True,
            "zeroline": False,
            "fixedrange": True,
            "tickvals": [0, 50, 100, 150, 200],
            "ticktext": ["200", "150", "100", "50", "0"],
            "title": "Time Elapsed (sec)",
        },
        yaxis={
            "range": [
                min(0, min(df["ServoMeter"])),
                max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
            ],
            "showgrid": True,
            "showline": True,
            "fixedrange": True,
            "zeroline": False,
            "gridcolor": app_color["graph_line"],
            "nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
        },
    )

    return dict(data=[trace], layout=layout)

Capturing all the relevant columns & transform them into a graph, where the application will consume data into both the axis (x-axis & y-axis).

There are many other useful snippets, which creates separate useful widgets inside the dashboard.


Run:

Let us run the application –

Dashboard-View

So, we’ve done it.

You will get the complete codebase in the following Github link.

There is an excellent resource from the dash framework, which you should explore. The following link would be handy for developers who want to get some meaningful pre-built dashboard template, which you can customize as per your need through Python or R. Please find the link here.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Predicting real-time Covid-19 forecast by analyzing time-series data using Facebook machine-learning API

Hello Guys,

Today, I’ll share one of the important posts on predicting data using facebook’s relatively new machine-learning-based API. I find this API is interesting as to how it can build & anticipate the outcome.

We’ll be using one of the most acceptable API-based sources for Covid-19 & I’ll be sharing the link over here.

We’ll be using the prophet-API developed by Facebook to predict the data. You will get the details from this link.

Architecture

Now, let’s explore the architecture shared above.

As you can see that the application will consume the data from the third-party API named “about-corona,” & the python application will clean & transform the data. The application will send the clean data to the Facebook API (prophet) built on the machine-learning algorithm. This API is another effective time-series analysis platform given to the data scientist community.

Once the application receives the predicted model, it will visualize them using plotly & matplotlib.


I would request you to please check the demo of this application just for your reference.

Demo Run

We’ll do a time series analysis. Let us understand the basic concept of time series.

Time series is a series of data points indexed (or listed or graphed) in time order.

Therefore, the data organized by relatively deterministic timestamps and potentially compared with random sample data contain additional information that we can leverage for our business use case to make a better decision.

To use the prophet API, one needs to use & transform their data cleaner & should contain two fields (ds & y).

Let’s check one such use case since our source has plenty of good data points to decide. We’ve daily data of newly infected covid patients based on countries, as shown below –

Covid Cases

And, our clean class will transform the data into two fields –

Transformed Data

Once we fit the data into the prophet model, it will generate some additional columns, which will be used for prediction as shown below –

Generated data from prophet-api

And, a sample prediction based on a similar kind of data would be identical to this –

Sample Prediction

Let us understand what packages we need to install to prepare this application –

Installing Dependency Packages – I
Installing Dependency Packages – II

And, here is the packages –

pip install pandas
pip install matplotlib
pip install prophet

Let us now revisit the code –

1. clsConfig.py ( This native Python script contains the configuration entries. )


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### for Prophet API. Application will ####
#### process these information & perform ####
#### the call to our newly developed with ####
#### APIs developed by Facebook & a open-source ####
#### project called "About-Corona". ####
#####################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"URL":"https://corona-api.com/countries/",
"appType":"application/json",
"conType":"keep-alive",
"limRec": 10,
"CACHE":"no-cache",
"coList": "DE, IN, US, CA, GB, ID, BR",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"MAX_RETRY": 3,
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

We’re not going to discuss anything specific to this script.

2. clsL.py ( This native Python script logs the application. )


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### ####
#### Objective: This script is a log ####
#### file, that is useful for debugging purpose. ####
#### ####
#####################################################
import pandas as p
import os
import platform as pl
class clsL(object):
def __init__(self):
self.path = os.path.dirname(os.path.realpath(__file__))
def logr(self, Filename, Ind, df, subdir=None, write_mode='w', with_index='N'):
try:
x = p.DataFrame()
x = df
sd = subdir
os_det = pl.system()
if sd == None:
if os_det == "windows":
fullFileName = self.path + '\\' + Filename
else:
fullFileName = self.path + '/' + Filename
else:
if os_det == "windows":
fullFileName = self.path + '\\' + sd + '\\' + Filename
else:
fullFileName = self.path + '/' + sd + '/' + Filename
if (with_index == 'N'):
if ((Ind == 'Y') & (write_mode == 'w')):
x.to_csv(fullFileName, index=False)
else:
x.to_csv(fullFileName, index=False, mode=write_mode, header=None)
else:
if ((Ind == 'Y') & (write_mode == 'w')):
x.to_csv(fullFileName)
else:
x.to_csv(fullFileName, mode=write_mode, header=None)
return 0
except Exception as e:
y = str(e)
print(y)
return 3

view raw

clsL.py

hosted with ❤ by GitHub

Based on the operating system, the log class will capture potential information under the “log” directory in the form of csv for later reference purposes.

3. clsForecast.py ( This native Python script will clean & transform the data. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling Data Cleaning API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from prophet import Prophet
class clsForecast:
def __init__(self):
self.fnc = cf.conf['FNC']
self.fnd = cf.conf['FND']
self.tms = cf.conf['TMS']
def forecastNewConfirmed(self, srcDF, debugInd, varVa):
try:
fnc = self.fnc
tms = self.tms
var = varVa
debug_ind = debugInd
countryISO = ''
df_M = p.DataFrame()
dfWork = srcDF
# Initiating Log class
l = cl.clsL()
#Extracting the unique country name
unqCountry = dfWork['CountryCode'].unique()
for i in unqCountry:
countryISO = i.strip()
print('Country Name: ' + countryISO)
df_Comm = dfWork[[tms, fnc]]
l.logr('13.df_Comm_' + var + '.csv', debug_ind, df_Comm, 'log')
# Aligning as per Prophet naming convention
df_Comm.columns = ['ds', 'y']
l.logr('14.df_Comm_Mod_' + var + '.csv', debug_ind, df_Comm, 'log')
return df_Comm
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df
def forecastNewDead(self, srcDF, debugInd, varVa):
try:
fnd = self.fnd
tms = self.tms
var = varVa
debug_ind = debugInd
countryISO = ''
df_M = p.DataFrame()
dfWork = srcDF
# Initiating Log class
l = cl.clsL()
#Extracting the unique country name
unqCountry = dfWork['CountryCode'].unique()
for i in unqCountry:
countryISO = i.strip()
print('Country Name: ' + countryISO)
df_Comm = dfWork[[tms, fnd]]
l.logr('17.df_Comm_' + var + '.csv', debug_ind, df_Comm, 'log')
# Aligning as per Prophet naming convention
df_Comm.columns = ['ds', 'y']
l.logr('18.df_Comm_Mod_' + var + '.csv', debug_ind, df_Comm, 'log')
return df_Comm
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df

view raw

clsForecast.py

hosted with ❤ by GitHub

Let’s explore the critical snippet out of this script –

df_Comm = dfWork[[tms, fnc]]

Now, the application will extract only the relevant columns to proceed.

df_Comm.columns = ['ds', 'y']

It is now assigning specific column names, which is a requirement for prophet API.

4. clsCovidAPI.py ( This native Python script will call the Covid-19 API. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling Covid-19 API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
class clsCovidAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typVal = cf.conf['coList']
self.max_retries = cf.conf['MAX_RETRY']
def searchQry(self, varVa, debugInd):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typVal = self.typVal
max_retries = self.max_retries
var = varVa
debug_ind = debugInd
cnt = 0
df_M = p.DataFrame()
# Initiating Log class
l = cl.clsL()
payload = {}
strMsg = 'Input Countries: ' + str(typVal)
logging.info(strMsg)
headers = {}
countryList = typVal.split(',')
for i in countryList:
# Failed case retry
retries = 1
success = False
val = ''
try:
while not success:
# Getting response from web service
try:
df_conv = p.DataFrame()
strCountryUrl = url + str(i).strip()
print('Country: ' + str(i).strip())
print('Url: ' + str(strCountryUrl))
str1 = 'Url: ' + str(strCountryUrl)
logging.info(str1)
response = requests.request("GET", strCountryUrl, headers=headers, params=payload)
ResJson = response.text
#jdata = json.dumps(ResJson)
RJson = json.loads(ResJson)
df_conv = p.io.json.json_normalize(RJson)
df_conv.drop(['data.timeline'], axis=1, inplace=True)
df_conv['DummyKey'] = 1
df_conv.set_index('DummyKey')
l.logr('1.df_conv_' + var + '.csv', debug_ind, df_conv, 'log')
# Extracting timeline part separately
Rjson_1 = RJson['data']['timeline']
df_conv2 = p.io.json.json_normalize(Rjson_1)
df_conv2['DummyKey'] = 1
df_conv2.set_index('DummyKey')
l.logr('2.df_conv_timeline_' + var + '.csv', debug_ind, df_conv2, 'log')
# Doing Cross Join
df_fin = df_conv.merge(df_conv2, on='DummyKey', how='outer')
l.logr('3.df_fin_' + var + '.csv', debug_ind, df_fin, 'log')
# Merging with the previous Country Code data
if cnt == 0:
df_M = df_fin
else:
d_frames = [df_M, df_fin]
df_M = p.concat(d_frames)
cnt += 1
strCountryUrl = ''
if str(response.status_code)[:1] == '2':
success = True
else:
wait = retries * 2
print("retries Fail! Waiting " + str(wait) + " seconds and retrying!")
str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
logging.info(str_R1)
time.sleep(wait)
retries += 1
# Checking maximum retries
if retries == max_retries:
success = True
raise Exception
except Exception as e:
x = str(e)
print(x)
logging.info(x)
pass
except Exception as e:
pass
l.logr('4.df_M_' + var + '.csv', debug_ind, df_M, 'log')
return df_M
except Exception as e:
x = str(e)
print(x)
logging.info(x)
df = p.DataFrame()
return df

view raw

clsCovidAPI.py

hosted with ❤ by GitHub

Let us explore the key snippet –

countryList = typVal.split(',')

The application will fetch individual country names into a list based on the input lists from the configure script.

response = requests.request("GET", strCountryUrl, headers=headers, params=payload)
ResJson = response.text

RJson = json.loads(ResJson)

df_conv = p.io.json.json_normalize(RJson)
df_conv.drop(['data.timeline'], axis=1, inplace=True)
df_conv['DummyKey'] = 1
df_conv.set_index('DummyKey')

The application will extract the elements & normalize the JSON & convert that to a pandas dataframe & also added one dummy column, which will use for the later purpose to merge the data from another set.

# Extracting timeline part separately
Rjson_1 = RJson['data']['timeline']

df_conv2 = p.io.json.json_normalize(Rjson_1)
df_conv2['DummyKey'] = 1
df_conv2.set_index('DummyKey')

Now, the application will take the nested element & normalize that as per granular level. Also, it added the dummy column to join both of these data together.

# Doing Cross Join
df_fin = df_conv.merge(df_conv2, on='DummyKey', how='outer')

The application will Merge both the data sets to get the complete denormalized data for our use cases.

# Merging with the previous Country Code data
if cnt == 0:
    df_M = df_fin
else:
    d_frames = [df_M, df_fin]
    df_M = p.concat(d_frames)

This entire deserializing execution happens per country. Hence, the above snippet will create an individual sub-group based on the country & later does union to all the sets.

if str(response.status_code)[:1] == '2':
    success = True
else:
    wait = retries * 2
    print("retries Fail! Waiting " + str(wait) + " seconds and retrying!")
    str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
    logging.info(str_R1)
    time.sleep(wait)
    retries += 1

# Checking maximum retries
if retries == max_retries:
    success = True
    raise  Exception

If any calls to source API fails, the application will retrigger after waiting for a specific time until it reaches its maximum capacity.

5. callPredictCovidAnalysis.py ( This native Python script is the main one to predict the Covid. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 26-Jul-2021 ####
#### ####
#### Objective: Calling multiple API's ####
#### that including Prophet-API developed ####
#### by Facebook for future prediction of ####
#### Covid-19 situations in upcoming days ####
#### for world's major hotspots. ####
##############################################
import json
import clsCovidAPI as ca
from clsConfig import clsConfig as cf
import datetime
import logging
import clsL as cl
import clsForecast as f
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly
import matplotlib.pyplot as plt
import pandas as p
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Initiating Log class
l = cl.clsL()
# Helper Function that removes underscores
def countryDet(inputCD):
try:
countryCD = inputCD
if str(countryCD) == 'DE':
cntCD = 'Germany'
elif str(countryCD) == 'BR':
cntCD = 'Brazil'
elif str(countryCD) == 'GB':
cntCD = 'United Kingdom'
elif str(countryCD) == 'US':
cntCD = 'United States'
elif str(countryCD) == 'IN':
cntCD = 'India'
elif str(countryCD) == 'CA':
cntCD = 'Canada'
elif str(countryCD) == 'ID':
cntCD = 'Indonesia'
else:
cntCD = 'N/A'
return cntCD
except:
cntCD = 'N/A'
return cntCD
def plot_picture(inputDF, debug_ind, var, countryCD, stat):
try:
iDF = inputDF
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
#m.plot_components(df_M)
# Getting Full Country Name
cntCD = countryDet(countryCD)
# Draw forecast results
lbl = str(cntCD) + ' – Covid – ' + stat
m.plot(df_M, xlabel = 'Date', ylabel = lbl)
# Combine all graps in the same page
plt.title(f'Covid Forecasting')
plt.title(lbl)
plt.ylabel('Millions')
plt.show()
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def countrySpecificDF(counryDF, val):
try:
countryName = val
df = counryDF
df_lkpFile = df[(df['CountryCode'] == val)]
return df_lkpFile
except:
df = p.DataFrame()
return df
def main():
try:
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
NC = 'New Confirmed'
ND = 'New Dead'
SM = 'data process Successful!'
FM = 'data process Failure!'
print("Calling the custom Package for large file splitting..")
print('Start Time: ' + str(var1))
countryList = str(cf.conf['coList']).split(',')
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'CovidAPI.log', level=logging.INFO)
# Create the instance of the Covid API Class
x1 = ca.clsCovidAPI()
# Let's pass this to our map section
retDF = x1.searchQry(var1, DInd)
retVal = int(retDF.shape[0])
if retVal > 0:
print('Successfully Covid Data Extracted from the API-source.')
else:
print('Something wrong with your API-source!')
# Extracting Skeleton Data
df = retDF[['data.code', 'date', 'deaths', 'confirmed', 'recovered', 'new_confirmed', 'new_recovered', 'new_deaths', 'active']]
df.columns = ['CountryCode', 'ReportedDate', 'TotalReportedDead', 'TotalConfirmedCase', 'TotalRecovered', 'NewConfirmed', 'NewRecovered', 'NewDeaths', 'ActiveCaases']
df.dropna()
print('Returned Skeleton Data Frame: ')
print(df)
l.logr('5.df_' + var1 + '.csv', DInd, df, 'log')
# Working with forecast
# Create the instance of the Forecast API Class
x2 = f.clsForecast()
# Fetching each country name & then get the details
cnt = 6
for i in countryList:
try:
cntryIndiv = i.strip()
print('Country Porcessing: ' + str(cntryIndiv))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
cntryFullName = countryDet(cntryIndiv)
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
var2 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var2))
print('*' *60)
except Exception as e:
x = str(e)
if __name__ == "__main__":
main()

Let us explore the key snippet –

def countryDet(inputCD):
    try:
        countryCD = inputCD

        if str(countryCD) == 'DE':
            cntCD = 'Germany'
        elif str(countryCD) == 'BR':
            cntCD = 'Brazil'
        elif str(countryCD) == 'GB':
            cntCD = 'United Kingdom'
        elif str(countryCD) == 'US':
            cntCD = 'United States'
        elif str(countryCD) == 'IN':
            cntCD = 'India'
        elif str(countryCD) == 'CA':
            cntCD = 'Canada'
        elif str(countryCD) == 'ID':
            cntCD = 'Indonesia'
        else:
            cntCD = 'N/A'

        return cntCD
    except:
        cntCD = 'N/A'

        return cntCD

The application is extracting the full country name based on ISO country code.

# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]

# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')

The above script will convert all the column names in lower letters & then convert & cast them with the appropriate data type.

# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
m.fit(iDF)

forecastDF = m.make_future_dataframe(periods=365)

forecastDF = m.predict(forecastDF)

l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')

df_M = forecastDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]

l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')

The above snippet will use the machine-learning driven prophet-API, where the application will fit the model & then predict based on the existing data for a year. Also, we’ve identified the number of changepoints. By default, the prophet-API adds 25 changepoints to the initial 80% of the data set that trend is less flexible. 

Prophet allows you to adjust the trend in case there is an overfit or underfit. changepoint_prior_scale helps adjust the strength of the movement & decreasing the changepoint_prior_scale to 0.001 to make it less flexible.

def countrySpecificDF(counryDF, val):
    try:
        countryName = val
        df = counryDF

        df_lkpFile = df[(df['CountryCode'] == val)]

        return df_lkpFile
    except:
        df = p.DataFrame()

        return df

The application is fetching & creating the country-specific dataframe.

for i in countryList:
    try:
        cntryIndiv = i.strip()

        print('Country Porcessing: ' + str(cntryIndiv))

        # Creating dataframe for each country
        # Germany Main DataFrame
        dfCountry = countrySpecificDF(df, cntryIndiv)
        l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')

        # Let's pass this to our map section
        retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)

        statVal = str(NC)

        a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)

        retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)

        statVal = str(ND)

        a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)

        cntryFullName = countryDet(cntryIndiv)

        if (a1 + a2) == 0:
            oprMsg = cntryFullName + ' ' + SM
            print(oprMsg)
        else:
            oprMsg = cntryFullName + ' ' + FM
            print(oprMsg)

        # Resetting the dataframe value for the next iteration
        dfCountry = p.DataFrame()
        cntryIndiv = ''
        oprMsg = ''
        cntryFullName = ''
        a1 = 0
        a2 = 0
        statVal = ''

        cnt += 1
    except Exception as e:
        x = str(e)
        print(x)

The above snippet will call the function to predict the data & then predict the visual representation based on plotting the data points.


Let us run the application –

Application Run

And, it will generate the visual representation as follows –

Application Run – Continue

And, here is the folder structure –

Directory Structure

Let’s explore the comparison study & try to find out the outcome –

Option – 1
Option – 2
Option – 3
Option -4

Let us analyze from the above visual data-point.


Conclusion:

Let’s explore the comparison study & try to find out the outcome –

  1. India may see a rise of new covid cases & it might cross the mark 400,000 during June 2022 & would be the highest among the countries that we’ve considered here including India, Indonesia, Germany, US, UK, Canada & Brazil. The second worst affected country might be the US during the same period. The third affected country will be Indonesia during the same period.
  2. Canada will be the least affected country during June 2022. The figure should be within 12,000.
  3. However, death case wise India is not only the leading country. The US, India & Brazil will see almost 4000 or slightly over the 4000 marks.

So, we’ve done it.


You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = 1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = 1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()