This site mainly deals with various use cases demonstrated using Python, Data Science, Cloud basics, SQL Server, Oracle, Teradata along with SQL & their implementation. Expecting yours active participation & time. This blog can be access from your TP, Tablet & mobile also. Please provide your feedback.
Today, I’m going to discuss a potential use case, where on many occasions, different teams need almost similar kinds of data through API. However, they are not identical. Creating a fresh API/Microservice after following-up with many processes will take significant time.
What if we can create an API in such a way so that we can get the response dynamically without needing to make another one. In this post, we’ll be demonstrating a similar approach.
I’ll be using open-source Covid-API, which will be useful for several posts starting from this one.
Let’s take two cases. One, where one service might need to access all the elements, there might be another, where some other service requires specific details.
Let’s explore the code base first –
init.py ( This native Python-based azure-function that will consume streaming data & dynamic API response. )
###########################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Feb-2021 ####
#### Package Flask package needs to ####
#### install in order to run this ####
#### script. ####
#### ####
#### Objective: Main Calling scripts. ####
#### ####
#### However, to meet the functionality####
#### we've enhanced as per our logic. ####
###########################################
import logging
import json
import requests
import os
import pandas as p
import numpy as np
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Dynamic-Covid-Status HTTP trigger function processed a request.')
try:
# Application Variable
url = os.environ['URL']
appType = os.environ['appType']
conType = os.environ['conType']
# API-Configuration
payload={}
headers = {
"Connection": conType,
"Content-Type": appType
}
# Validating input parameters
typeSel = req.params.get('typeSel')
if not typeSel:
try:
req_body = req.get_json()
except ValueError:
pass
else:
typeSel = req_body.get('typeSel')
typeVal = req.params.get('typeVal')
if not typeVal:
try:
req_body = req.get_json()
except ValueError:
pass
else:
typeVal = req_body.get('typeVal')
# Printing Key-Element Values
str1 = 'typeSel: ' + str(typeSel)
logging.info(str1)
str2 = 'typeVal: ' + str(typeVal)
logging.info(str2)
# End of API-Inputs
# Getting Covid data from the REST-API
response = requests.request("GET", url, headers=headers, data=payload)
ResJson = response.text
if typeSel == '*':
if typeVal != '':
# Converting it to Json
jdata = json.loads(ResJson)
df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])
rJson = df_ret.to_json(orient ='records')
return func.HttpResponse(rJson, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for all values!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
elif typeSel == 'Cols':
if typeVal != '':
# Converting it to Json
jdata = json.loads(ResJson)
df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])
# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []
listX = typeVal.split (",")
for i in listX:
lstHead.append(str(i).strip())
str3 = 'Main List: ' + str(lstHead)
logging.info(str3)
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
rJson = slice_df.to_json(orient ='records')
return func.HttpResponse(rJson, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for selected values!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for typeSel!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
except Exception as e:
x_msg = str(e)
x_stat = 'Failed'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
In the above lines, we’re converting the response & organizing it to a pandas dataframe before converting the response to JSON.
# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []
listX = typeVal.split (",")
for i in listX:
lstHead.append(str(i).strip())
str3 = 'Main List: ' + str(lstHead)
logging.info(str3)
#slice_df = df_ret[df_ret.columns.intersection(lstHead)]
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
For the second case, the above additional logic will play a significant part. Based on the supplied input in the typeVal attribute, this time, the new response will display accordingly.
Let’s see how it looks –
Azure function in Visual Studio Code
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">Let's test it using Postman -Let’s test it using Postman –
Case 1 (For all the columns):
For all elements
And, the formatted output is as follows –
Formatted output for all elements
Case 2 (For selected columns):
For selected elements
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">And, the formatted output is as follows -And, the formatted output is as follows –
Formatted output ofSelected element case
You can find the code in the Github using the following link.
So, finally, we have done it.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.
In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.
Examining Source Data.
No SQL Data from Cosmos:
Let’s check one more time the No SQL data created in our last post.
Total, we’ve created 6 records in our last post.
As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.
Azure SQL DB:
Let’s create some data in Azure SQL DB.
But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.
I won’t discuss the detailed steps of creating DB here.
From Azure portal, it looks like –
Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.
Let’s create the table & some sample data aligned as per our cosmos data.
We will join both the data based on subscriberId & then extract our required columns in our final output.
Good. Now, we’re ready for python scripts.
Python Scripts:
In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –
clsL.py
clsColMgmt.py
clsCosmosDBDet.py
So, I’m not going to discuss these scripts.
Before we discuss our scripts, let’s look out the directory structures –
Here is the detailed directory structure between the Windows & MAC O/S.
1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.
3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### Modified On 02-Jun-2019 ####
#### ####
#### Objective: Main calling scripts. ####
##############################################
import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def getDate(row):
try:
d1 = row['orderDate']
d1_str = str(d1)
d1_dt_part, sec = d1_str.split('.')
dt_part1 = d1_dt_part.replace('T', ' ')
return dt_part1
except Exception as e:
x = str(e)
print(x)
dt_part1 = ''
return dt_part1
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
df_ret = p.DataFrame()
df_ret_2 = p.DataFrame()
df_ret_2_Mod = p.DataFrame()
debug_ind = 'Y'
# Initiating Log Class
l = cl.clsLog()
general_log_path = str(cf.config['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)
# Moving previous day log files to archive directory
arch_dir = cf.config['ARCH_DIR']
log_dir = cf.config['LOG_PATH']
print("Archive Directory:: ", arch_dir)
print("Log Directory::", log_dir)
print("*" * 157)
print("Testing COSMOS DB Connection!")
print("*" * 157)
# Checking Cosmos DB Azure
y = cmdb.clsCosmosDBDet()
ret_val = y.test_db_con()
if ret_val == 0:
print()
print("Cosmos DB Connection Successful!")
print("*" * 157)
else:
print()
print("Cosmos DB Connection Failure!")
print("*" * 157)
raise Exception
print("*" * 157)
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])
print("Azure SQL DB::")
print(act_df)
print()
print("-" * 157)
# Calling the function 1
print("RealtimeEmail::")
# Fetching First collection data to dataframe
print("Fethcing Comos Collection Data!")
sql_qry_1 = cf.config['SQL_QRY_1']
msg = "Documents generatd based on unique key"
collection_flg = 1
x = cm.clsColMgmt()
df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)
l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('RealtimeEmail Data::')
print(df_ret)
print()
# Checking execution status
ret_val = int(df_ret.shape[0])
if ret_val == 0:
print("Cosmos DB Hans't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfully fetched!")
print("*" * 157)
# Calling the 2nd Collection
print("RealtimeTwitterFeedback::")
# Fetching First collection data to dataframe
print("Fethcing Cosmos Collection Data!")
# Query using parameters
sql_qry_2 = cf.config['SQL_QRY_2']
msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
collection_flg = 2
val = 'crazyGo'
param_det = [{"name": "@CrVal", "value": val}]
add_param = 2
x1 = cm.clsColMgmt()
df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)
l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('Realtime Twitter Data:: ')
print(df_ret_2)
print()
# Checking execution status
ret_val_2 = int(df_ret_2.shape[0])
if ret_val_2 == 0:
print("Cosmos DB hasn't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfuly row feteched!")
print("*" * 157)
# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')
df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]
print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
print(df_fin)
l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')
# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)
# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)
print("*" * 157)
print()
print("Final Combined & Transformed result:: ")
print(df_fin)
l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
print("*" * 157)
except ValueError:
print("No relevant data to proceed!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()
Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.
# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]
In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.
# Transforming the orderDate as per standard formatdf_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)# Dropping the old column & renaming the new column to old columndf_fin.drop(columns=['orderDate'], inplace=True)df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)
In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.
Let’s see the directory structure of our program –
Let’s see how it looks when it runs –
Windows:
MAC:
So, finally, we’ve successfully blended the data & make more meaningful data projection.
Following python packages are required to run this application –
pip install azure
pip install azure-cosmos
pip install pandas
pip install requests
pip install pyodbc
This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.
I hope you’ll like this effort.
Wait for the next installment. Till then, Happy Avenging. 😀
[Note: All the sample data are available/prepared in the public domain for research & study.]
Today, we’ll discuss how to improve your panda’s data processing power using Multi-threading. Note that, we are not going to use any third party python package. Also, we’ll be using a couple of python scripts, which we’ve already discussed in our previous posts. Hence, this time, I won’t post them here.
Please refer the following scripts –
a. callClient.pyb. callRunServer.pyc. clsConfigServer.pyd. clsEnDec.pye. clsFlask.pyf. clsL.pyg. clsParam.pyh. clsSerial.pyi. clsWeb.py
Please find the above scripts described here with details.
So, today, we’ll be looking into how the multi-threading really helps the application to gain some performance over others.
Let’s go through our existing old sample files –
And, we’ve four columns that are applicable for encryption. This file contains 10K records. That means the application will make 40K calls to the server for a different kind of encryption for each column.
Now, if you are going with the serial approach, which I’ve already discussed here, will take significant time for data processing. However, if we could club a few rows as one block & in this way we can create multiple blocks out of our data csv like this –
As you can see that blocks are marked with a different color. So, now if you send each block of data in parallel & send the data for encryption. Ideally, you will be able to process data much faster than the usual serial process. And, this what we would be looking for with the help of python’s multi-threading & queue. Without the queue, this program won’t be possible as the queue maintains the data & process integrity.
One more thing we would like to explain here. Whenever this application is sending the block of data. It will be posting that packed into a (key, value) dictionary randomly. Key will be the thread name. The reason, we’re not expecting data after process might arrive in some random order wrapped with the dictionary as well. Once the application received all the dictionary with dataframe with encrypted/decrypted data, the data will be rearranged based on the key & then joined back with the rest of the data.
Let’s see one sample way of sending & receiving random thread –
The left-hand side, the application is splitting the recordset into small chunks of a group. Once, those group created, using python multi-threading the application is now pushing them into the queue for the producer to produce the encrypted/decrypted value. Similar way, after processing the application will push the final product into the queue for consuming the final output.
This is the pictorial representation of dictionary ordering based on the key-value & then the application will extract the entire data to form the target csv file.
Let’s explore the script –
1. clsParallel.py (This script will consume the split csv files & send the data blocks in the form of the dictionary using multi-threading to the API for encryption in parallel. Hence, the name comes into the picture.)
As we’ve already described the logic of these methods in our previous post.
# Checking total count of rowscount_row = df_input.shape[0]print('Total number of records to process:: ', count_row)interval = int(count_row / num_worker_threads) + 1actual_worker_task = int(count_row / interval) + 1
Fetching the total number of rows from the dataframe. Based on the row count, the application will derive the actual number of threads that will be used for parallelism.
for i in range(actual_worker_task): t = threading.Thread(target=self.getEncryptWQ) t.start() threads.append(t) name = str(t.getName()) if ((start_pos + interval) < count_row): end_pos = start_pos + interval else: end_pos = start_pos + (count_row - start_pos) split_df = df_input.iloc[start_pos:end_pos] l_dict[name] = split_df if ((start_pos > count_row) | (start_pos == count_row)): break else: start_pos = start_pos + interval q.put(l_dict) cnt += 1
Here, the application is splitting the data into multiple groups of smaller data packs & then combining them into (key, value) dictionary & finally placed them into the individual queue.
# block until all tasks are doneq.join()
This will join the queue process. This will ensure that queues are free after consuming the data.
# stop workersfor i in range(actual_worker_task): c_dict['TEND'] = p.DataFrame() q.put(c_dict)for t in threads: t.join()
The above lines are essential. As this will help the process to identify that no more data are left to send at the queue. And, the main thread will wait until all the threads are done.
for k, v in fin_dict.items(): min_val_list[int(k.replace('Thread-',''))] = vmin_val = min(min_val_list, key=int)
Once, all the jobs are done. The application will find the minimum thread value & based on that we can sequence all the data chunks as explained in our previous image & finally clubbed them together to form the complete csv.
for k, v in sorted(fin_dict.items(), key=lambda k:int(k[0].replace('Thread-',''))): if int(k.replace('Thread-','')) == min_val: df_ret = fin_dict[k] else: d_frames = [df_ret, fin_dict[k]] df_ret = p.concat(d_frames)
As already explained, using the starting point of our data dictionary element, the application is clubbing the data back to the main csv.
Next method, which we’ll be explaining is –
getEncryptWQ
Please find the key lines –
while True: try: #item_dict = q.get() item_dict = q.get_nowait() for k, v in item_dict.items(): # Assigning Target File Basic Name item = str(k) if ((item == 'TEND') | (item == '')): break if ((item != 'TEND') | (item != '')): self.getEncrypt(item_dict) q.task_done() except Exception: break
This method will consume the data & processing it for encryption or decryption. This will continue to do the work until or unless it receives the key value as TEND or the queue is empty.
Let’s compare the statistics between Windows & MAC.
Let’s see the file structure first –
Windows (16 GB – Core 2) Vs Mac (10 GB – Core 2):
Windows (16 GB – Core 2):
Mac (10 GB – Core 2):
Find the complete directory from both the machine.
Windows (16 GB – Core 2):
Mac (10 GB – Core 2):
Here is the final output –
So, we’ve achieved our target goal.
Let me know – how do you like this post. Please share your suggestion & comments.
I’ll be back with another installment from the Python verse.
Today, we’ll be looking into another exciting installment of cross-over between Reality Stone & Timestone from the python verse.
We’ll be exploring Encryption/Decryption implemented using the Flask Framework Server component. We would like to demonstrate this Encrypt/Decrypt features as Server API & then we can call it from clients like Postman to view the response.
So, here are primary focus will be implementing this in Server-side rather than the client-side.
However, there is a catch. We would like to implement different kind of encryption or decryption based on our source data.
Let’s look into the sample data first –
As you can see, we intend to encrypt Account Number encryption with different salt compared to Name or Phone or Email. Hence, we would be using different salt to encrypt our sample data & get the desired encrypt/decrypt output.
From the above data, we can create the following types of JSON payload –
Let’s explore –
Before we start, we would like to show you the directory structure of Windows & MAC as we did the same in my earlier post as well.
Following are the scripts that we’re using to develop this server applications & they are as follows –
1. clsConfigServer.py (This script contains all the parameters of the server.)
As mentioned, the different salt key’s defined for different kind of data.
2. clsEnDec.py (This script is a lighter version of encryption & decryption of our previously discussed script. Hence, we won’t discuss in details. You can refer my earlier post to understand the logic of this script.)
############################################### Written By: SATYAKI DE ############ Written On: 25-Jan-2019 ############ Package Cryptography needs to ############ install in order to run this ############ script. ############ ############ Objective: This script will ############ encrypt/decrypt based on the ############ hidden supplied salt value. ###################################################fromcryptography.fernetimport Fernet
classclsEnDec(object):
def__init__(self, token):
# Calculating Keyself.token = token
defencrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
encr_val =str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return encr_val
exceptExceptionas e:
x =str(e)
print(x)
encr_val =''return encr_val
defdecrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
decr_val =str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return decr_val
exceptExceptionas e:
x =str(e)
print(x)
decr_val =''return decr_val
3. clsFlask.py (This is the main server script that will the encrypt/decrypt class from our previous script. This script will capture the requested JSON from the client, who posted from the clients like another python script or third-party tools like Postman.)
############################################### Written By: SATYAKI DE ######## Written On: 25-Jan-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## encrypt/decrypt based on the ######## supplied salt value. Also, ######## this will capture the individual ######## element & stored them into JSON ######## variables using flask framework. ###############################################fromclsConfigServerimport clsConfigServer as csf
importclsEnDecascenclassclsFlask(object):
def__init__(self):
self.xtoken =str(csf.config['DEF_SALT'])
defgetEncryptProcess(self, dGroup, input_data, dTemplate):
try:
# It is sending default salt value
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will encrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid json Error Responsereturn ret_val
defgetDecryptProcess(self, dGroup, input_data, dTemplate):
try:
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will decrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid Error Responsereturn ret_val
Key lines to check –
# This will check the mandatory json elementsif ((dGroup != '') & (dTemplate != '')):
Encrypt & Decrypt will only work on the data when the key element contains valid values. In this case, we are looking for values stored in dGroup & dTemplate, which will denote the specific encryption type.
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will encrypt the dataif ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')): xtoken = str(csf.config['ACCT_NBR_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')): xtoken = str(csf.config['NAME_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')): xtoken = str(csf.config['PHONE_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')): xtoken = str(csf.config['EMAIL_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)
Here, as you can see that based on dGroup & dTemplate, the application is using specific salt to encrypt or decrypt the corresponding data. Highlighted dark brown showed a particular salt against dGroup & dTemplate.
4. callRunServer.py (This script will create an instance of Flask Server & serve encrypt/decrypt facilities & act as an endpoint or server API & provide the response made to it by clients such as another python or any third-party application.)
Based on the path & method, this will trigger either encrypt or decrypt methods.
# If the server application doesn't have
# valid json, it will throw 400 error
if not request.get_json: abort(400)
As the comments suggested, this will check whether the sample data send to the server application is a valid JSON or not. And, based on that, it will proceed or abort the request & send the response back to the client.
# Capturing the individual element
content = request.get_json()dGroup = content['dataGroup']input_data = content['data']dTemplate = content['dataTemplate']
Here, the application is capturing the json into individual elements.
if ((dGroup != '') & (dTemplate != '')): y = clf.clsFlask() ret_val = y.getEncryptProcess(dGroup, input_data, dTemplate)else: abort(500)
The server will process only when both the dGroup & dTemplate will contains no null values. The same logic is applicable for both the encrypt & decrypt process.
return jsonify({'status': 'success', 'encrypt_val': ret_val})except Exception as e: x = str(e) return jsonify({'status': 'error', 'detail': x})
If the process is successful, then it will send a json response, or else it will return json with error details. Similar logic is applicable for decrypt as well.
Based on the supplied IP address from our configuration file, this server will create an instance on that specific IP address when triggers. Please refer clsConfigServer.py for particular parameter values.
Let’s run the server application & see the debug encrypt & decrypt screen looks from the server-side –
Windows (64 bit):
And, we’re using Postman Third-party app to invoke this & please find the authentication details & JSON Payload for encrypting are as follows –
Let’s see the decrypt from the server-side & how it looks like from the Postman –
Mac (32 bit):
Let’s look from MAC’s perspective & how the encryption debug looks like from the server.
Please find the screen from postman along with the necessary authentication –
Let’s discover how the decrypt looks like both from server & Postman as well –
So, from this post, we’ve achieved our goal. We’ve successfully demonstrated of a creating a server component using Flask framework & we’ve incorporated our custom encryption/decryption script to create a simulated API for the third-party clients or any other application.
Hope, you will like this approach.
Let me know your comment on the same.
I’ll bring some more exciting topic in the coming days from the Python verse.
You must be logged in to post a comment.