Live visual reading using Convolutional Neural Network (CNN) through Python-based machine-learning application.

This week we’re planning to touch on one of the exciting posts of visually reading characters from WebCAM & predict the letters using CNN methods. Before we dig deep, why don’t we see the demo run first?

Demo

Isn’t it fascinating? As we can see, the computer can record events and read like humans. And, thanks to the brilliant packages available in Python, which can help us predict the correct letter out of an Image.


What do we need to test it out?

  1. Preferably an external WebCAM.
  2. A moderate or good Laptop to test out this.
  3. Python 
  4. And a few other packages that we’ll mention next block.

What Python packages do we need?

Some of the critical packages that we must need to test out this application are –

cmake==3.22.1
dlib==19.19.0
face-recognition==1.3.0
face-recognition-models==0.3.0
imutils==0.5.3
jsonschema==4.4.0
keras==2.7.0
Keras-Preprocessing==1.1.2
matplotlib==3.5.1
matplotlib-inline==0.1.3
oauthlib==3.1.1
opencv-contrib-python==4.1.2.30
opencv-contrib-python-headless==4.4.0.46
opencv-python==4.5.5.62
opencv-python-headless==4.5.5.62
pickleshare==0.7.5
Pillow==9.0.0
python-dateutil==2.8.2
requests==2.27.1
requests-oauthlib==1.3.0
scikit-image==0.19.1
scikit-learn==1.0.2
tensorboard==2.7.0
tensorboard-data-server==0.6.1
tensorboard-plugin-wit==1.8.1
tensorflow==2.7.0
tensorflow-estimator==2.7.0
tensorflow-io-gcs-filesystem==0.23.1
tqdm==4.62.3

What is CNN?

In deep learning, a convolutional neural network (CNN/ConvNet) is a class of deep neural networks most commonly applied to analyze visual imagery.

Different Steps of CNN

We can understand from the above picture that a CNN generally takes an image as input. The neural network analyzes each pixel separately. The weights and biases of the model are then tweaked to detect the desired letters (In our use case) from the image. Like other algorithms, the data also has to pass through pre-processing stage. However, a CNN needs relatively less pre-processing than most other Deep Learning algorithms.

If you want to know more about this, there is an excellent article on CNN with some on-point animations explaining this concept. Please read it here.

Where do we get the data sets for our testing?

For testing, we are fortunate enough to have Kaggle with us. We have received a wide variety of sample data, which you can get from here.


Our use-case:

Architecture

From the above diagram, one can see that the python application will consume a live video feed of any random letters (both printed & handwritten) & predict the character as part of the machine learning model that we trained.


Code:

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'A_Z_Handwritten_Data.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize'😦3, 3),
'poolSize'😦2, 2),
'filterVal1':32,
'filterVal2':64,
'filterVal3':128,
'stridesVal':2,
'monitorVal':'val_loss',
'paddingVal1':'same',
'paddingVal2':'valid',
'reshapeVal':28,
'reshapeVal1'😦28,28),
'patienceVal1':1,
'patienceVal2':2,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'minDeltaVal':0,
'minLrVal':0.0001,
'verboseFlag':0,
'modeInd':'auto',
'shuffleVal':100,
'DenkseVal1':26,
'DenkseVal2':64,
'DenkseVal3':128,
'predParam':9,
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},
'width':640,
'height':480,
'imgSize': (32,32),
'threshold': 0.45,
'imgDimension': (400, 440),
'imgSmallDim': (7, 7),
'imgMidDim': (28, 28),
'reshapeParam1':1,
'reshapeParam2':28,
'colorFeed'😦0,0,130),
'colorPredict'😦0,25,255)
}

view raw

clsConfig.py

hosted with ❤ by GitHub

Important parameters that we need to follow from the above snippets are –

'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'activationType':'relu',
'activationType2':'softmax',
'numOfClasses':26,
'kernelSize':(3, 3),
'poolSize':(2, 2),
'word_dict':{0:'A',1:'B',2:'C',3:'D',4:'E',5:'F',6:'G',7:'H',8:'I',9:'J',10:'K',11:'L',12:'M',13:'N',14:'O',15:'P',16:'Q',17:'R',18:'S',19:'T',20:'U',21:'V',22:'W',23:'X', 24:'Y',25:'Z'},

Since we have 26 letters, we have classified it as 26 in the numOfClasses.

Since we are talking about characters, we had to come up with a process of identifying each character as numbers & then processing our entire logic. Hence, the above parameter named word_dict captured all the characters in a python dictionary & stored them. Moreover, the application translates the final number output to more appropriate characters as the prediction.

2. clsAlphabetReading.py (Main training class to teach the model to predict alphabets from visual reader.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
from keras.datasets import mnist
import matplotlib.pyplot as plt
import cv2
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, Flatten, Conv2D, MaxPool2D, Dropout
from tensorflow.keras.optimizers import SGD, Adam
from keras.callbacks import ReduceLROnPlateau, EarlyStopping
from keras.utils.np_utils import to_categorical
import pandas as p
import numpy as np
from sklearn.model_selection import train_test_split
from keras.utils import np_utils
import matplotlib.pyplot as plt
from tqdm import tqdm_notebook
from sklearn.utils import shuffle
import pickle
import os
import platform as pl
from clsConfig import clsConfig as cf
class clsAlphabetReading:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.fileName = str(cf.conf['FILE_NAME'])
self.testRatio = float(cf.conf['testRatio'])
self.valRatio = float(cf.conf['valRatio'])
self.epochsVal = int(cf.conf['epochsVal'])
self.activationType = str(cf.conf['activationType'])
self.activationType2 = str(cf.conf['activationType2'])
self.numOfClasses = int(cf.conf['numOfClasses'])
self.kernelSize = cf.conf['kernelSize']
self.poolSize = cf.conf['poolSize']
self.filterVal1 = int(cf.conf['filterVal1'])
self.filterVal2 = int(cf.conf['filterVal2'])
self.filterVal3 = int(cf.conf['filterVal3'])
self.stridesVal = int(cf.conf['stridesVal'])
self.monitorVal = str(cf.conf['monitorVal'])
self.paddingVal1 = str(cf.conf['paddingVal1'])
self.paddingVal2 = str(cf.conf['paddingVal2'])
self.reshapeVal = int(cf.conf['reshapeVal'])
self.reshapeVal1 = cf.conf['reshapeVal1']
self.patienceVal1 = int(cf.conf['patienceVal1'])
self.patienceVal2 = int(cf.conf['patienceVal2'])
self.sleepTime = int(cf.conf['sleepTime'])
self.sleepTime1 = int(cf.conf['sleepTime1'])
self.factorVal = float(cf.conf['factorVal'])
self.learningRateVal = float(cf.conf['learningRateVal'])
self.minDeltaVal = int(cf.conf['minDeltaVal'])
self.minLrVal = float(cf.conf['minLrVal'])
self.verboseFlag = int(cf.conf['verboseFlag'])
self.modeInd = str(cf.conf['modeInd'])
self.shuffleVal = int(cf.conf['shuffleVal'])
self.DenkseVal1 = int(cf.conf['DenkseVal1'])
self.DenkseVal2 = int(cf.conf['DenkseVal2'])
self.DenkseVal3 = int(cf.conf['DenkseVal3'])
self.predParam = int(cf.conf['predParam'])
self.word_dict = cf.conf['word_dict']
def applyCNN(self, X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg):
try:
testRatio = self.testRatio
epochsVal = self.epochsVal
activationType = self.activationType
activationType2 = self.activationType2
numOfClasses = self.numOfClasses
kernelSize = self.kernelSize
poolSize = self.poolSize
filterVal1 = self.filterVal1
filterVal2 = self.filterVal2
filterVal3 = self.filterVal3
stridesVal = self.stridesVal
monitorVal = self.monitorVal
paddingVal1 = self.paddingVal1
paddingVal2 = self.paddingVal2
reshapeVal = self.reshapeVal
patienceVal1 = self.patienceVal1
patienceVal2 = self.patienceVal2
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
factorVal = self.factorVal
learningRateVal = self.learningRateVal
minDeltaVal = self.minDeltaVal
minLrVal = self.minLrVal
verboseFlag = self.verboseFlag
modeInd = self.modeInd
shuffleVal = self.shuffleVal
DenkseVal1 = self.DenkseVal1
DenkseVal2 = self.DenkseVal2
DenkseVal3 = self.DenkseVal3
model = Sequential()
model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))
model.add(Flatten())
model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))
model.add(Dense(DenkseVal1,activation = activationType2))
model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)
fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop], validation_data = (X_Validation,Y_Validation_Catg))
return (model, fittedModel)
except Exception as e:
x = str(e)
model = Sequential()
print('Error: ', x)
return (model, model)
def trainModel(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
fileName = self.fileName
epochsVal = self.epochsVal
valRatio = self.valRatio
predParam = self.predParam
testRatio = self.testRatio
reshapeVal = self.reshapeVal
numOfClasses = self.numOfClasses
sleepTime = self.sleepTime
sleepTime1 = self.sleepTime1
shuffleVal = self.shuffleVal
reshapeVal1 = self.reshapeVal1
# Dictionary for getting characters from index values
word_dict = self.word_dict
print('File Name: ', str(fileName))
# Read the data
df_HW_Alphabet = p.read_csv(fileName).astype('float32')
# Sample Data
print('Sample Data: ')
print(df_HW_Alphabet.head())
# Split data the (x – Our data) & (y – the prdict label)
x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']
# Reshaping the data in csv file to display as an image
X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)
X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))
print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)
# Plotting the number of alphabets in the dataset
Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
count[i] +=1
alphabets = []
for i in word_dict.values():
alphabets.append(i)
fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)
plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()
# Shuffling the data
shuff = shuffle(X_Train[:shuffleVal])
# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)
X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)
X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)
# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)
Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)
Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)
model, history = self.applyCNN(X_Train, Y_Train_Catg, X_Validation, Y_Validation_Catg)
print('Model Summary: ')
print(model.summary())
# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])
# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
# Making the model to predict
pred = model.predict(X_Test[:predParam])
print('Test Details::')
print('X_Test: ', X_Test.shape)
print('Y_Test_Catg: ', Y_Test_Catg.shape)
try:
score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
print('Test Score = ', score[0])
print('Test Accuracy = ', score[1])
except Exception as e:
x = str(e)
print('Error: ', x)
# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()
for i in range(9):
axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
pred = word_dict[np.argmax(Y_Test_Catg[i])]
print('Prediction: ', pred)
axes[i].set_title("Test Prediction: " + pred)
axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets from the above scripts are –

x = df_HW_Alphabet.drop('0',axis = 1)
y = df_HW_Alphabet['0']

In the above snippet, we have split the data into images & their corresponding labels.

X_Train, X_Test, Y_Train, Y_Test = train_test_split(x, y, test_size = testRatio)
X_Train, X_Validation, Y_Train, Y_Validation = train_test_split(X_Train, Y_Train, test_size = valRatio)

X_Train = np.reshape(X_Train.values, (X_Train.shape[0], reshapeVal, reshapeVal))
X_Test = np.reshape(X_Test.values, (X_Test.shape[0], reshapeVal, reshapeVal))
X_Validation = np.reshape(X_Validation.values, (X_Validation.shape[0], reshapeVal, reshapeVal))


print("Train Data Shape: ", X_Train.shape)
print("Test Data Shape: ", X_Test.shape)
print("Validation Data shape: ", X_Validation.shape)

We are splitting the data into Train, Test & Validation sets to get more accurate predictions and reshaping the raw data into the image by consuming the 784 data columns to 28×28 pixel images.

Since we are talking about characters, we had to come up with a process of identifying The following snippet will plot the character equivalent number into a matplotlib chart & showcase the overall distribution trend after splitting.

Y_Train_Num = np.int0(y)
count = np.zeros(numOfClasses, dtype='int')
for i in Y_Train_Num:
    count[i] +=1

alphabets = []
for i in word_dict.values():
    alphabets.append(i)

fig, ax = plt.subplots(1,1, figsize=(7,7))
ax.barh(alphabets, count)

plt.xlabel("Number of elements ")
plt.ylabel("Alphabets")
plt.grid()
plt.show(block=False)
plt.pause(sleepTime)
plt.close()

Note that we have tweaked the plt.show property with (block=False). This property will enable us to continue execution without human interventions after the initial pause.

# Model reshaping the training & test dataset
X_Train = X_Train.reshape(X_Train.shape[0],X_Train.shape[1],X_Train.shape[2],1)
print("Shape of Train Data: ", X_Train.shape)

X_Test = X_Test.reshape(X_Test.shape[0], X_Test.shape[1], X_Test.shape[2],1)
print("Shape of Test Data: ", X_Test.shape)

X_Validation = X_Validation.reshape(X_Validation.shape[0], X_Validation.shape[1], X_Validation.shape[2],1)
print("Shape of Validation data: ", X_Validation.shape)

# Converting the labels to categorical values
Y_Train_Catg = to_categorical(Y_Train, num_classes = numOfClasses, dtype='int')
print("Shape of Train Labels: ", Y_Train_Catg.shape)

Y_Test_Catg = to_categorical(Y_Test, num_classes = numOfClasses, dtype='int')
print("Shape of Test Labels: ", Y_Test_Catg.shape)

Y_Validation_Catg = to_categorical(Y_Validation, num_classes = numOfClasses, dtype='int')
print("Shape of validation labels: ", Y_Validation_Catg.shape)

In the above diagram, the application did reshape all three categories of data before calling the primary CNN function.

model = Sequential()

model.add(Conv2D(filters=filterVal1, kernel_size=kernelSize, activation=activationType, input_shape=(28,28,1)))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal2, kernel_size=kernelSize, activation=activationType, padding = paddingVal1))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Conv2D(filters=filterVal3, kernel_size=kernelSize, activation=activationType, padding = paddingVal2))
model.add(MaxPool2D(pool_size=poolSize, strides=stridesVal))

model.add(Flatten())

model.add(Dense(DenkseVal2,activation = activationType))
model.add(Dense(DenkseVal3,activation = activationType))

model.add(Dense(DenkseVal1,activation = activationType2))

model.compile(optimizer = Adam(learning_rate=learningRateVal), loss='categorical_crossentropy', metrics=['accuracy'])
reduce_lr = ReduceLROnPlateau(monitor=monitorVal, factor=factorVal, patience=patienceVal1, min_lr=minLrVal)
early_stop = EarlyStopping(monitor=monitorVal, min_delta=minDeltaVal, patience=patienceVal2, verbose=verboseFlag, mode=modeInd)


fittedModel = model.fit(X_Train, Y_Train_Catg, epochs=epochsVal, callbacks=[reduce_lr, early_stop],  validation_data = (X_Validation,Y_Validation_Catg))

return (model, fittedModel)

In the above snippet, the convolution layers are followed by maxpool layers, which reduce the number of features extracted. The output of the maxpool layers and convolution layers are flattened into a vector of a single dimension and supplied as an input to the Dense layer—the CNN model prepared for training the model using the training dataset.

We have used optimization parameters like Adam, RMSProp & the application we trained for eight epochs for better accuracy & predictions.

# Displaying the accuracies & losses for train & validation set
print("Validation Accuracy :", history.history['val_accuracy'])
print("Training Accuracy :", history.history['accuracy'])
print("Validation Loss :", history.history['val_loss'])
print("Training Loss :", history.history['loss'])

# Displaying the Loss Graph
plt.figure(1)
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.legend(['training','validation'])
plt.title('Loss')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

# Dsiplaying the Accuracy Graph
plt.figure(2)
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.legend(['training','validation'])
plt.title('Accuracy')
plt.xlabel('epoch')
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Also, we have captured the validation Accuracy & Loss & plot them into two separate graphs for better understanding.

try:
    score = model.evaluate(X_Test, Y_Test_Catg, verbose=0)
    print('Test Score = ', score[0])
    print('Test Accuracy = ', score[1])
except Exception as e:
    x = str(e)
    print('Error: ', x)

Also, the application is trying to get the accuracy of the model that we trained & validated with the training & validation data. This time we have used test data to predict the confidence score.

# Displaying some of the test images & their predicted labels
fig, ax = plt.subplots(3,3, figsize=(8,9))
axes = ax.flatten()

for i in range(9):
    axes[i].imshow(np.reshape(X_Test[i], reshapeVal1), cmap="Greys")
    pred = word_dict[np.argmax(Y_Test_Catg[i])]
    print('Prediction: ', pred)
    axes[i].set_title("Test Prediction: " + pred)
    axes[i].grid()
plt.show(block=False)
plt.pause(sleepTime1)
plt.close()

Finally, the application testing with some random test data & tried to plot the output & prediction assessment.

Testing with Random Test Data
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_out = open(fileName, 'wb')
pickle.dump(model, pickle_out)
pickle_out.close()

As a part of the last step, the application will generate the models using a pickle package & save them under a specific location, which the reader application will use.

3. trainingVisualDataRead.py (Main application that will invoke the training class to predict alphabet through WebCam using Convolutional Neural Network (CNN).)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Jan-2022 ####
#### Modified On 17-Jan-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsAlhpabetReading class to initiate ####
#### teach & perfect the model to read ####
#### visual alphabets using Convolutional ####
#### Neural Network (CNN). ####
###############################################
# We keep the setup code in a different class as shown below.
import clsAlphabetReading as ar
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = ar.clsAlphabetReading()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.trainModel(debugInd, var)
if (r1 == 0):
print('Successfully Visual Alphabet Training Completed!')
else:
print('Failed to complete the Visual Alphabet Training!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the core snippet from the above script is –

x1 = ar.clsAlphabetReading()

Instantiate the main class.

r1 = x1.trainModel(debugInd, var)

The python application will invoke the class & capture the returned value inside the r1 variable.

4. readingVisualData.py (Reading the model to predict Alphabet using WebCAM.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 18-Jan-2022 ####
#### Modified On 18-Jan-2022 ####
#### ####
#### Objective: This python script will ####
#### scan the live video feed from the ####
#### web-cam & predict the alphabet that ####
#### read it. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import cv2
import pickle
import numpy as np
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
epochsVal = int(cf.conf['epochsVal'])
numOfClasses = int(cf.conf['numOfClasses'])
word_dict = cf.conf['word_dict']
width = int(cf.conf['width'])
height = int(cf.conf['height'])
imgSize = cf.conf['imgSize']
threshold = float(cf.conf['threshold'])
imgDimension = cf.conf['imgDimension']
imgSmallDim = cf.conf['imgSmallDim']
imgMidDim = cf.conf['imgMidDim']
reshapeParam1 = int(cf.conf['reshapeParam1'])
reshapeParam2 = int(cf.conf['reshapeParam2'])
colorFeed = cf.conf['colorFeed']
colorPredict = cf.conf['colorPredict']
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Live Streaming!')
cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)
fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))
pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)
while True:
status, img = cap.read()
if status == False:
break
img_copy = img.copy()
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)
img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)
img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))
img_pred = word_dict[np.argmax(model.predict(img_final))]
# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)
cv2.putText(img, "Live Feed : (" + str(probVal) + "%) ", (20,25), cv2.FONT_HERSHEY_TRIPLEX, 0.7, color = colorFeed)
cv2.putText(img, "Prediction: " + img_pred, (20,410), cv2.FONT_HERSHEY_DUPLEX, 1.3, color = colorPredict)
cv2.imshow("Original Image", img)
if cv2.waitKey(1) & 0xFF == ord('q'):
r1=0
break
if (r1 == 0):
print('Successfully Alphabets predicted!')
else:
print('Failed to predict alphabet!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And the key snippet from the above code is –

cap = cv2.VideoCapture(0)
cap.set(3, width)
cap.set(4, height)

The application is reading the live video data from WebCAM. Also, set out the height & width for the video output.

fileName = Curr_Path + sep + 'Model' + sep + 'model_trained_' + str(epochsVal) + '.p'
print('Model Name: ', str(fileName))

pickle_in = open(fileName, 'rb')
model = pickle.load(pickle_in)

The application reads the model output generated as part of the previous script using the pickle package.

while True:
    status, img = cap.read()

    if status == False:
        break

The application will read the WebCAM & it exits if there is an end of video transmission or some kind of corrupt video frame.

img_copy = img.copy()

img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = cv2.resize(img, imgDimension)

img_copy = cv2.GaussianBlur(img_copy, imgSmallDim, 0)
img_gray = cv2.cvtColor(img_copy, cv2.COLOR_BGR2GRAY)
bin, img_thresh = cv2.threshold(img_gray, 100, 255, cv2.THRESH_BINARY_INV)

img_final = cv2.resize(img_thresh, imgMidDim)
img_final = np.reshape(img_final, (reshapeParam1,reshapeParam2,reshapeParam2,reshapeParam1))


img_pred = word_dict[np.argmax(model.predict(img_final))]

We have initially cloned the original video frame & then it converted from BGR2GRAYSCALE while applying the threshold on it doe better prediction outcomes. Then the image has resized & reshaped for model input. Finally, the np.argmax function extracted the class index with the highest predicted probability. Furthermore, it is translated using the word_dict dictionary to an Alphabet & displayed on top of the Live View.

# Extracting Probability Values
Predict_X = model.predict(img_final)
probVal = round(np.amax(Predict_X) * 100)

Also, derive the confidence score of that probability & display that on top of the Live View.

if cv2.waitKey(1) & 0xFF == ord('q'):
    r1=0
    break

The above code will let the developer exit from this application by pressing the “Esc” or “q”-key from the keyboard & the program will terminate.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality of Alphabet.

Restoring old video’s with python-based application

Hi Guys!

Today, I’ll be demonstrating a primary way to improve the quality of old video using the Open-CV package. This post is the first of such a series of Open-CV that I’ll be posting in the coming years.

Let me tell you one thing – there are many brilliant papers on this, especially image enhancement with OpenCV, Pillow & many more valuable libraries. I’ll share some of the fascinating links later at the end of my blog.


What are we planning here?

We’ll de-noise the old video.
Slightly bright the video.

What kind of video should be the ideal candidate for this test?

Any video with more noise with low light will be an ideal candidate for this use case.


Why don’t we see the demo?

Demo

Architecture:

Let us find the basic architecture –

Flow of executions

Code:

Let us explore the the key code base as follows –

  1. clsVideo2Frame.py (This will convert the supplied video into multiple frames. It will also extract the audio from the source file, which will later merge with the enhanced frames.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
import av
import os
import platform as pl
import subprocess
import sys
from clsConfig import clsConfig as cf
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsVideo2Frame:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def genFrame(self, dInd, var):
try:
base_path = self.base_path
fileNm = self.fileNm
path_to_src_video = base_path + sep + 'Source' + sep + fileNm + '.mp4'
temp_path = base_path + sep + 'Temp' + sep
print('Path: ', path_to_src_video)
x = self.convert_video_to_audio_ffmpeg(path_to_src_video)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
container = av.open(path_to_src_video)
for frame in container.decode(video=0):
frame.to_image().save(temp_path + 'frame-%04d.jpg' % frame.index)
print('Successfully Converted to Frames!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand some of the key snippet below –

    def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
        try:
            """Converts video to audio directly using `ffmpeg` command
            with the help of subprocess module"""
            filename, ext = os.path.splitext(video_file)
            subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                            stdout=subprocess.DEVNULL,
                            stderr=subprocess.STDOUT)

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In the above step, using the FFmpeg package python application is extracting the source audio & storing that into the source directory itself.

for frame in container.decode(video=0):
    frame.to_image().save(temp_path + 'frame-%04d.jpg' % frame.index)

From the above snippet, we can say that the application is splitting the videos into multiple frames & storing them into the temp directory, which will require later enhancement by another class.

2. clsFrameEnhance.py (This will enhance the frames as per your logic & upscale them with the parameters provided by you.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: This python script will ####
#### enhance the old existing frame by ####
#### applying machine-learning algorithm ####
#### to improve their quality one at a ####
#### time. ####
#### ####
##############################################
import av
import os
import platform as pl
import numpy as np
import cv2
import glob
from PIL import Image
from numpy import asarray
import numpy as np
from clsConfig import clsConfig as cf
import sys
# Global Variable
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsFrameEnhance:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def show(self, enhanced_path, fileNameOnly, buff):
cv2.imwrite(enhanced_path + fileNameOnly, buff)
def unsharp_mask(self, image, kernel_size=(3, 3), sigma=1.0, amount=2.0, threshold=2):
"""Return a sharpened version of the image, using an unsharp mask."""
blurred = cv2.GaussianBlur(image, kernel_size, sigma)
sharpened = float(amount + 1) * image float(amount) * blurred
sharpened = np.maximum(sharpened, np.zeros(sharpened.shape))
sharpened = np.minimum(sharpened, 255 * np.ones(sharpened.shape))
sharpened = sharpened.round().astype(np.uint8)
if threshold > 0:
low_contrast_mask = np.absolute(image blurred) < threshold
np.copyto(sharpened, image, where=low_contrast_mask)
return sharpened
def doEnhance(self, dInd, var):
try:
base_path = self.base_path
temp_path = base_path + sep + 'Temp' + sep
enhanced_path = base_path + sep + 'Enhanced' + sep
for filename in sorted(glob.glob(temp_path + '*.jpg')):
print('Full File Name: ', str(filename))
img = cv2.imread(filename)
if img is None:
print('Failed to load image file:', filename)
sys.exit(1)
sharpened_image = self.unsharp_mask(img)
img = np.asarray(sharpened_image)
dst = cv2.fastNlMeansDenoising(img,None,7,7,21)
Inten_matrix = np.ones(dst.shape, dtype='uint8')*20
bright_img = cv2.add(dst, Inten_matrix)
head, tail = os.path.split(filename)
self.show(enhanced_path, tail, bright_img)
# Remove Files
os.remove(filename)
print('Successfully Enhanced the Frames!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand some of the key snippet below –

    def unsharp_mask(self, image, kernel_size=(3, 3), sigma=1.0, amount=2.0, threshold=2):
        """Return a sharpened version of the image, using an unsharp mask."""
        blurred = cv2.GaussianBlur(image, kernel_size, sigma)
        sharpened = float(amount + 1) * image - float(amount) * blurred
        sharpened = np.maximum(sharpened, np.zeros(sharpened.shape))
        sharpened = np.minimum(sharpened, 255 * np.ones(sharpened.shape))
        sharpened = sharpened.round().astype(np.uint8)
        if threshold > 0:
            low_contrast_mask = np.absolute(image - blurred) < threshold
            np.copyto(sharpened, image, where=low_contrast_mask)
        return sharpened

This will provide the sharpen version of the image. If you want to know more about this. Please refer the following link.

img = np.asarray(sharpened_image)

dst = cv2.fastNlMeansDenoising(img,None,7,7,21)

Inten_matrix = np.ones(dst.shape, dtype='uint8')*20
bright_img = cv2.add(dst, Inten_matrix)

As you can see, the image has further enhanced with the use of de-noise & the addition of brightest pixels using Inten_matrix.

3. clsFrame2Video.py (This will combine the frames along with the audio & produce the final video.)


###############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 17-Dec-2021 ####
#### ####
#### Objective: This script will convert ####
#### enhanced frames to restored better ####
#### quality videos & merge it with source ####
#### audio. ####
#### ####
###############################################
import os
import platform as pl
import cv2
import numpy as np
import glob
import re
import ffmpeg
from clsConfig import clsConfig as cf
import logging
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsFrame2Video:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def convert2Vid(self, dInd, var):
try:
img_array = []
fileNm = self.fileNm
base_path = self.base_path
enhanced_path = base_path + sep + 'Enhanced' + sep
target_path = base_path + sep + 'Target' + sep
path_to_src_audio = base_path + sep + 'Source' + sep + fileNm + '.mp3'
files = glob.glob(enhanced_path + '*.jpg')
for filename in sorted(files, key=lambda x:float(re.findall("(-\d+)",x)[0].replace('-',''))):
print('Processing… ', str(filename))
img = cv2.imread(filename)
height, width, layers = img.shape
size = (width,height)
img_array.append(img)
# Deleting Frames
os.remove(filename)
print('Successfully Removed Old Enhanced Frames!')
out = cv2.VideoWriter(target_path + 'Temp.avi',cv2.VideoWriter_fourcc(*'DIVX'), 23, size)
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
print('Temporary File generated!')
Temp_Target_File = str(target_path + 'Temp.avi')
print('Temporary Video File Name: ', Temp_Target_File)
print('Temporary Audio File Name: ', str(path_to_src_audio))
infile1 = ffmpeg.input(Temp_Target_File)
infile2 = ffmpeg.input(path_to_src_audio)
ffmpeg.concat(infile1, infile2, v=1, a=1).output(target_path + fileNm + '.mp4').run()
# Deleting Frames
os.remove(Temp_Target_File)
print('Successfully Converted to Videos!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let’s explore the key snippets –

files = glob.glob(enhanced_path + '*.jpg')

for filename in sorted(files, key=lambda x:float(re.findall("(-\d+)",x)[0].replace('-',''))):
    print('Processing... ', str(filename))
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width,height)
    img_array.append(img)

    # Deleting Frames
    os.remove(filename)

print('Successfully Removed Old Enhanced Frames!')

In the above snippet, the application first stitched frames together to form a temporary video without the audio.

Temp_Target_File = str(target_path + 'Temp.avi')
print('Temporary Video File Name: ', Temp_Target_File)
print('Temporary Audio File Name: ', str(path_to_src_audio))

infile1 = ffmpeg.input(Temp_Target_File)
infile2 = ffmpeg.input(path_to_src_audio)

ffmpeg.concat(infile1, infile2, v=1, a=1).output(target_path + fileNm + '.mp4').run()


out = cv2.VideoWriter(target_path + 'Temp.avi',cv2.VideoWriter_fourcc(*'DIVX'), 23, size)

Finally, merge the audio with the video to produce the final enriched video.

4. restoreOldVideo.py (This is the main application, which will invoke all the python class.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Dec-2021 ####
#### Modified On 17-Dec-2021 ####
#### ####
#### Objective: This python script will ####
#### convert the old B&W video & restore ####
#### them to relatively better quality. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsVideo2Frame as vf
import clsFrameEnhance as fe
import clsFrame2Video as fv
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = vf.clsVideo2Frame()
x2 = fe.clsFrameEnhance()
x3 = fv.clsFrame2Video()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.genFrame(debugInd, var)
r2 = x2.doEnhance(debugInd, var)
r3 = x3.convert2Vid(debugInd, var)
if ((r1 == 0) and (r2 == 0) and (r3 == 0)):
print('Successfully File Enhanced!')
else:
print('Failed to enhance the source file!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Let us understand the final script –

x1 = vf.clsVideo2Frame()
x2 = fe.clsFrameEnhance()
x3 = fv.clsFrame2Video()

The above lines will instantiate the main python class.

# Execute all the pass
r1 = x1.genFrame(debugInd, var)
r2 = x2.doEnhance(debugInd, var)
r3 = x3.convert2Vid(debugInd, var)

Invoking all the functions with parameters to perform the video upscale.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the quality of video. At this moment, the enhancement class working on a serial manner. You can implement threading or multiprocessing to make it more faster.

Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(6, 8)
total_2 = total_2 + random.randint(5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com&quot;
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height – 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2-190, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x – 260
center_y: self.parent.center_y – 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/2-55, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x – 250
center_y: self.parent.center_y – 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x – 240
center_y: self.parent.center_y – 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Projecting real-time KPIs by ingesting streaming events from emulated IoT-device

Today, I am planning to demonstrate an IoT use case implemented in Python. I was waiting for my Raspberry Pi to arrive. However, the product that I received was not working as expected. Perhaps, some hardware malfunction. Hence, I was looking for a way to continue with my installment even without the hardware.

I was looking for an alternative way to use an online Raspberry Pi emulator. Recently, Microsoft has introduced integrated Raspberry Pi, which you can directly integrate with Azure IoT. However, I couldn’t find any API, which I could leverage on my Python application.

So, I explored all the possible options & finally come-up with the idea of creating my own IoT-Emulator, which can integrate with any application. With the help from the online materials, I have customized & enhanced them as per my use case & finally come up with this clean application that will demonstrate this use case with clarity.

We’ll showcase this real-time use case, where we would try to capture the events generated by IoT in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes.


However, I would like to share the run before we dig deep into this.

Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & captures real-time events to Ably Queue, then transform those raw events into more meaningful KPIs. Let’s deep dive then.


Architecture:

Let’s explore the architecture –

Fig – 1

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, Dashboard consumes the events & transforms them into more meaningful metrics.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installation

Step – 2:

Installation – Continue

And, here is the command to install those packages –

pip install dash==1.0.0
pip install numpy==1.16.4
pip install pandas==0.24.2
pip install scipy==1.3.0
pip install gunicorn==19.9.0
pip install ably==1.1.1
pip install tkgpio==0.1

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py (This native Python script contains the configuration entries.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 25-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'JSONFileNameWithPath': Curr_Path + sep + 'GUI_Config' + sep + 'CircuitConfiguration.json',
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 50,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID, FinData & JSONFileNameWithPath.

2. clsPublishStream.py (This script will publish real-time streaming data coming out from a hosted API sources using another popular third-party service named Ably. Ably mimics pubsub Streaming concept, which might be extremely useful for any start-ups.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.msgSize = cf.conf['limRec']
def pushEvents(self, srcJSON, debugInd, varVa):
try:
msgSize = self.msgSize
# Capturing the inbound dataframe
jdata_fin = json.dumps(srcJSON)
print('IOT Events: ')
print(str(jdata_fin))
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’re not going to discuss this as we’ve already discussed in my previous post.

3. clsStreamConsume.py (Consuming Streaming data from Ably channels.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### playIOTDevice.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
#jdata = json.dumps(json_data)
# Converting String to Dictionary
dict_json = eval(json_data)
# Converting JSON to Dataframe
#df = p.json_normalize(json_data)
#df.columns = df.columns.map(lambda x: x.split(".")[-1])
df = p.DataFrame.from_dict(dict_json, orient='index')
#print('DF Inside:')
#print(df)
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print('Error: ', x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’re not going to discuss this as we’ve already discussed in my previous post.

4. CircuitConfiguration.json (Configuration file for GUI Interface for IoT Simulator.)


{
"name":"Analog Device",
"width":700,
"height":350,
"leds":[
{
"x":105,
"y":80,
"name":"LED",
"pin":21
}
],
"motors":[
{
"x":316,
"y":80,
"name":"DC Motor",
"forward_pin":22,
"backward_pin":23
}
],
"servos":[
{
"x":537,
"y":80,
"name":"Servo Motor",
"pin":24,
"min_angle":-180,
"max_angle":180,
"initial_angle":20
}
],
"adc":{
"mcp_chip":3008,
"potenciometers":[
{
"x":40,
"y":200,
"name":"Brightness Potentiometer",
"channel":0
},
{
"x":270,
"y":200,
"name":"Speed Potentiometer",
"channel":2
},
{
"x":500,
"y":200,
"name":"Angle Potentiometer",
"channel":6
}
]
},
"toggles":[
{
"x":270,
"y":270,
"name":"Direction Toggle Switch",
"pin":15,
"off_label":"backward",
"on_label":"forward",
"is_on":false
}
],
"labels":[
{
"x":15,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":56,
"y":26,
"text":"Brightness Control"
},
{
"x":245,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":298,
"y":26,
"text":"Speed Control"
},
{
"x":475,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":531,
"y":26,
"text":"Angle Control"
}
]
}

This json configuration will be used by the next python class.

5. clsBuildCircuit.py (Calling Tk Circuit API.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Calling Tk Circuit API ####
##############################################
from tkgpio import TkCircuit
from json import load
from clsConfig import clsConfig as cf
fileName = str(cf.conf['JSONFileNameWithPath'])
print('File Name: ', str(fileName))
# initialize the circuit inside the GUI
with open(fileName, "r") as file:
config = load(file)
class clsBuildCircuit:
def __init__(self):
self.config = config
def genCir(self, main_function):
try:
config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)
return circuit
except Exception as e:
x = str(e)
print(x)
return ''

Key snippets from the above script –

config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)

The above lines will create an instance of simulated IoT circuits & then it will use the json file to start the GUI class.

6. playIOTDevice.py (Main Circuit GUI script to create an IoT Device to generate the events, which will consumed.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Main Tk Circuit GUI script ####
#### to create an IOT Device to generate ####
#### the events, which will consumed. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsBuildCircuit as csb
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Create the instance of the Tk Circuit API Class.
circuit = csb.clsBuildCircuit()
###############################################
### End of Global Section ###
###############################################
# Invoking the IOT Device Generator.
@circuit.genCir
def main():
from gpiozero import PWMLED, Motor, Servo, MCP3008, Button
from time import sleep
# Circuit Components
ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)
ioMeter1 = MCP3008(0)
ioMeter2 = MCP3008(2)
ioMeter3 = MCP3008(6)
switch = Button(15)
# End of circuit components
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IOTDevice.log', level=logging.INFO)
while True:
ledAlert.value = ioMeter1.value
if switch.is_pressed:
dcMotor.forward(ioMeter2.value)
xVal = 'Motor Forward'
else:
dcMotor.backward(ioMeter2.value)
xVal = 'Motor Backward'
servoMotor.value = 1 2 * ioMeter3.value
srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IOT event pushed!')
else:
print('Failed to push IOT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = 1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
sleep(0.05)

Lets’ explore the key snippets –

ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)

It defines three motors that include Servo, DC & LED.

Now, we can see the following sets of the critical snippet –

ledAlert.value = ioMeter1.value

if switch.is_pressed:
    dcMotor.forward(ioMeter2.value)
    xVal = 'Motor Forward'
else:
    dcMotor.backward(ioMeter2.value)
    xVal = 'Motor Backward'

servoMotor.value = 1 - 2 * ioMeter3.value

srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}

Following lines will dynamically generates JSON that will be passed into the Ably queue –

tmpJson = str(srcJson)

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

Final line from the above script –

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

This code will now push the events into the Ably Queue.

7. app.py (Consuming Streaming data from Ably channels & captured IOT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 02-Oct-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IOT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import os
import pathlib
import numpy as np
import datetime as dt
import dash
from dash import dcc
from dash import html
import datetime
import dash_daq as daq
from dash.exceptions import PreventUpdate
from dash.dependencies import Input, Output, State
from scipy.stats import rayleigh
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
GRAPH_INTERVAL = os.environ.get("GRAPH_INTERVAL", 5000)
app = dash.Dash(
__name__,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}],
)
app.title = "IOT Device Dashboard"
server = app.server
app_color = {"graph_bg": "#082255", "graph_line": "#007ACE"}
app.layout = html.Div(
[
# header
html.Div(
[
html.Div(
[
html.H4("IOT DEVICE STREAMING", className="app__header__title"),
html.P(
"This app continually consumes streaming data from IOT-Device and displays live charts of various metrics & KPI associated with it.",
className="app__header__title–grey",
),
],
className="app__header__desc",
),
html.Div(
[
html.A(
html.Button("SOURCE CODE", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream&quot;,
),
html.A(
html.Button("VIEW DEMO", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream/blob/main/demo.gif&quot;,
),
html.A(
html.Img(
src=app.get_asset_url("dash-new-logo.png"),
className="app__menu__img",
),
href="https://plotly.com/dash/&quot;,
),
],
className="app__header__logo",
),
],
className="app__header",
),
html.Div(
[
# Motor Speed
html.Div(
[
html.Div(
[html.H6("SERVO METER (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
),
# Second Panel
html.Div(
[html.H6("DC-MOTOR (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure-1",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update-1",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
)
],
className="two-thirds column motor__speed__container",
),
html.Div(
[
# histogram
html.Div(
[
html.Div(
[
html.H6(
"MOTOR POWER HISTOGRAM",
className="graph__title",
)
]
),
html.Div(
[
dcc.Slider(
id="bin-slider",
min=1,
max=60,
step=1,
value=20,
updatemode="drag",
marks={
20: {"label": "20"},
40: {"label": "40"},
60: {"label": "60"},
},
)
],
className="slider",
),
html.Div(
[
dcc.Checklist(
id="bin-auto",
options=[
{"label": "Auto", "value": "Auto"}
],
value=["Auto"],
inputClassName="auto__checkbox",
labelClassName="auto__label",
),
html.P(
"# of Bins: Auto",
id="bin-size",
className="auto__p",
),
],
className="auto__container",
),
dcc.Graph(
id="motor-histogram",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container first",
),
# motor direction
html.Div(
[
html.Div(
[
html.H6(
"SERVO MOTOR DIRECTION", className="graph__title"
)
]
),
dcc.Graph(
id="servo-motor-direction",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container second",
),
],
className="one-third column histogram__direction",
),
],
className="app__content",
),
],
className="app__container",
)
def toPositive(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMotor']))
elif flag == 'DCMotor':
x_val = abs(float(row['DCMotor'])) * 0.001
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def toPositiveInflated(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMeter'])) * 100
elif flag == 'DCMotor':
x_val = abs(float(row['DCMeter'])) * 100
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def getData(var, Ind):
try:
# Let's pass this to our map section
df = x1.conStream(var, Ind)
df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)
# Dropping old columns
df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)
#Rename New Columns to Old Columns
df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
@app.callback(
Output("iot-measure-1", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the DC Meter graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["DCMotor"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["DCMeter"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["DCMotor"])),
max(100, max(df["DCMotor"]) + max(df["DCMeter"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["DCMotor"].iloc[1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the Motor Speed graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["ServoMeter"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["ServoMotor"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["ServoMeter"])),
max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["ServoMeter"].iloc[1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("servo-motor-direction", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_motor_direction(interval):
"""
Generate the Servo direction graph.
:params interval: update the graph based on an interval
"""
df = getData(var1, DInd)
val = df["ServoMeter"].iloc[1]
direction = [0, (df["ServoMeter"][0]*100 20), (df["ServoMeter"][0]*100 + 20), 0]
traces_scatterpolar = [
{"r": [0, val, val, 0], "fillcolor": "#084E8A"},
{"r": [0, val * 0.65, val * 0.65, 0], "fillcolor": "#B4E1FA"},
{"r": [0, val * 0.3, val * 0.3, 0], "fillcolor": "#EBF5FA"},
]
data = [
dict(
type="scatterpolar",
r=traces["r"],
theta=direction,
mode="lines",
fill="toself",
fillcolor=traces["fillcolor"],
line={"color": "rgba(32, 32, 32, .6)", "width": 1},
)
for traces in traces_scatterpolar
]
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
autosize=False,
polar={
"bgcolor": app_color["graph_line"],
"radialaxis": {"range": [0, 45], "angle": 45, "dtick": 10},
"angularaxis": {"showline": False, "tickcolor": "white"},
},
showlegend=False,
)
return dict(data=data, layout=layout)
@app.callback(
Output("motor-histogram", "figure"),
[Input("iot-measure-update", "n_intervals")],
[
State("iot-measure", "figure"),
State("bin-slider", "value"),
State("bin-auto", "value"),
],
)
def gen_motor_histogram(interval, iot_speed_figure, slider_value, auto_state):
"""
Genererate iot histogram graph.
:params interval: upadte the graph based on an interval
:params iot_speed_figure: current Motor Speed graph
:params slider_value: current slider value
:params auto_state: current auto state
"""
motor_val = []
try:
print('Inside gen_motor_histogram:')
print('iot_speed_figure::')
print(iot_speed_figure)
# Check to see whether iot-measure has been plotted yet
if iot_speed_figure is not None:
motor_val = iot_speed_figure["data"][0]["y"]
if "Auto" in auto_state:
bin_val = np.histogram(
motor_val,
bins=range(int(round(min(motor_val))), int(round(max(motor_val)))),
)
else:
bin_val = np.histogram(motor_val, bins=slider_value)
except Exception as error:
raise PreventUpdate
avg_val = float(sum(motor_val)) / len(motor_val)
median_val = np.median(motor_val)
pdf_fitted = rayleigh.pdf(
bin_val[1], loc=(avg_val) * 0.55, scale=(bin_val[1][1] bin_val[1][0]) / 3
)
y_val = (pdf_fitted * max(bin_val[0]) * 20,)
y_val_max = max(y_val[0])
bin_val_max = max(bin_val[0])
trace = dict(
type="bar",
x=bin_val[1],
y=bin_val[0],
marker={"color": app_color["graph_line"]},
showlegend=False,
hoverinfo="x+y",
)
traces_scatter = [
{"line_dash": "dash", "line_color": "#2E5266", "name": "Average"},
{"line_dash": "dot", "line_color": "#BD9391", "name": "Median"},
]
scatter_data = [
dict(
type="scatter",
x=[bin_val[int(len(bin_val) / 2)]],
y=[0],
mode="lines",
line={"dash": traces["line_dash"], "color": traces["line_color"]},
marker={"opacity": 0},
visible=True,
name=traces["name"],
)
for traces in traces_scatter
]
trace3 = dict(
type="scatter",
mode="lines",
line={"color": "#42C4F7"},
y=y_val[0],
x=bin_val[1][: len(bin_val[1])],
name="Rayleigh Fit",
)
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
xaxis={
"title": "Motor Power",
"showgrid": False,
"showline": False,
"fixedrange": True,
},
yaxis={
"showgrid": False,
"showline": False,
"zeroline": False,
"title": "Number of Samples",
"fixedrange": True,
},
autosize=True,
bargap=0.01,
bargroupgap=0,
hovermode="closest",
legend={
"orientation": "h",
"yanchor": "bottom",
"xanchor": "center",
"y": 1,
"x": 0.5,
},
shapes=[
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": avg_val,
"x1": avg_val,
"type": "line",
"line": {"dash": "dash", "color": "#2E5266", "width": 5},
},
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": median_val,
"x1": median_val,
"type": "line",
"line": {"dash": "dot", "color": "#BD9391", "width": 5},
},
],
)
return dict(data=[trace, scatter_data[0], scatter_data[1], trace3], layout=layout)
@app.callback(
Output("bin-auto", "value"),
[Input("bin-slider", "value")],
[State("iot-measure", "figure")],
)
def deselect_auto(slider_value, iot_speed_figure):
""" Toggle the auto checkbox. """
# prevent update if graph has no data
if "data" not in iot_speed_figure:
raise PreventUpdate
if not len(iot_speed_figure["data"]):
raise PreventUpdate
if iot_speed_figure is not None and len(iot_speed_figure["data"][0]["y"]) > 5:
return [""]
return ["Auto"]
@app.callback(
Output("bin-size", "children"),
[Input("bin-auto", "value")],
[State("bin-slider", "value")],
)
def show_num_bins(autoValue, slider_value):
""" Display the number of bins. """
if "Auto" in autoValue:
return "# of Bins: Auto"
return "# of Bins: " + str(int(slider_value))
if __name__ == "__main__":
app.run_server(debug=True)

view raw

app.py

hosted with ❤ by GitHub

Here are the key snippets –

html.Div(
        [
            html.Div(
                [html.H6("SERVO METER (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            ),
            # Second Panel
            html.Div(
                [html.H6("DC-MOTOR (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure-1",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update-1",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            )
        ],
        className="two-thirds column motor__speed__container",

The following line creates two panels, where the application will consume the streaming data by the app’s call-back feature & refresh the data & graphs as & when the application receives the streaming data.

A similar approach was adopted for other vital aspects/components inside the dashboard.

def getData(var, Ind):
    try:
        # Let's pass this to our map section
        df = x1.conStream(var, Ind)

        df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
        df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
        df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
        df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)

        # Dropping old columns
        df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)

        #Rename New Columns to Old Columns
        df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
        df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
        df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The application is extracting streaming data & consuming it from the Ably queue.

@app.callback(
    Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
    """
    Generate the Motor Speed graph.

    :params interval: update the graph based on an interval
    """

    # Let's pass this to our map section
    df = getData(var1, DInd)

    trace = dict(
        type="scatter",
        y=df["ServoMeter"],
        line={"color": "#42C4F7"},
        hoverinfo="skip",
        error_y={
            "type": "data",
            "array": df["ServoMotor"],
            "thickness": 1.5,
            "width": 2,
            "color": "#B4E8FC",
        },
        mode="lines",
    )

    layout = dict(
        plot_bgcolor=app_color["graph_bg"],
        paper_bgcolor=app_color["graph_bg"],
        font={"color": "#fff"},
        height=400,
        xaxis={
            "range": [0, 200],
            "showline": True,
            "zeroline": False,
            "fixedrange": True,
            "tickvals": [0, 50, 100, 150, 200],
            "ticktext": ["200", "150", "100", "50", "0"],
            "title": "Time Elapsed (sec)",
        },
        yaxis={
            "range": [
                min(0, min(df["ServoMeter"])),
                max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
            ],
            "showgrid": True,
            "showline": True,
            "fixedrange": True,
            "zeroline": False,
            "gridcolor": app_color["graph_line"],
            "nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
        },
    )

    return dict(data=[trace], layout=layout)

Capturing all the relevant columns & transform them into a graph, where the application will consume data into both the axis (x-axis & y-axis).

There are many other useful snippets, which creates separate useful widgets inside the dashboard.


Run:

Let us run the application –

Dashboard-View

So, we’ve done it.

You will get the complete codebase in the following Github link.

There is an excellent resource from the dash framework, which you should explore. The following link would be handy for developers who want to get some meaningful pre-built dashboard template, which you can customize as per your need through Python or R. Please find the link here.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.

Python-based dash framework visualizing real-time covid-19 trend.

Hi Team,

We’ll enhance our last post on Covid-19 prediction & try to capture them in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes. In short, this is genuinely a real-time visual dashboard displaying all the graphs, trends depending upon the third-party API source data change.

However, I would like to share the run before we dig deep into this.

Demo Run

Architecture:

Let us understand the architecture for this solution –

Streaming Architecture

From the above diagram, one can see that we’re maintaining a similar approach compared to our last initiative. However, we’ve used a different framework to display the data live.

To achieve this, we’ve used a compelling python-based framework called Dash. Other than that, we’ve used Ably, Plotly & Prophet API.

If you need to know more about our last post, please visit this link.


Package Installation:

Let us understand the sample packages that require for