This site mainly deals with various use cases demonstrated using Python, Data Science, Cloud basics, SQL Server, Oracle, Teradata along with SQL & their implementation. Expecting yours active participation & time. This blog can be access from your TP, Tablet & mobile also. Please provide your feedback.
Today, I’ll demonstrate one of the fascinating ways to capture real-time streaming data in a dashboard. It is a dream for any developer who wants to build an application involving streaming data, API & a dashboard.
Why don’t we see our run to make this thread more interesting?
Real-Time Dashboard using streaming data
Today, I’ll be using the two most essential services to achieve that goal.
Ably
H2O-Wave
Let’s discuss brief about these two services.
Why I used “Ably” here?
One of my scenarios is to consume real-time currency data. Even after checking paid-API, I was not getting what I was looking for. Hence, I decided to use any service, which can mimics & publish my data as streaming data through a channel. Once published, I’ll consume the posted data into my application to create this new dashboard.
Using Ably, you can leverage their cloud platform to publish & consume data with the free developer account, which is sufficient for anyone.
To better understand this, we need to understand the basic concept of “pubsub”. Here is the important page from their side that I would like to embed for your reference –
Source: Ably
To know more about this, please refer to the following link.
Why I used “H2O-Wave” here?
Wave_H2O is a relatively brand new framework with some outstanding capabilities to visualize your data using native Python.
Pre-Steps:
We need to register Ably. Some of the useful screen that we should explore more –
API-Key Page
Successful creation of an App will generate the API-Key. Make sure that you note-down the channel details as well.
Quota Limit
The above page will capture the details of usage. Since this is a free subscription, you will be blocked once you consume your limit. However, for paid users, this is one of the vital pages to control their budget.
Message Published & Consumption Visuals
Like any other cloud service, you can check your message published or consumptions here on this page.
This is the main landing page for H2O-Wave –
H2O Wave
They have a quite many example snippet. However, these samples contain random data. Hence, these are relatively easier to implement. It would take quite some effort to tailor it for your need to implement that for real-life scenarios.
You need to install the following libraries in Python –
pip install ably
pip install h2o-wave
We’ve two scripts. We’re not going to discuss the publish streaming data script over here. We’ll be discussing only the consumption script, which will generate the dashboard as well. If you need, you can post your message. I’ll provide it.
1. dashboard_st.py ( This native Python script will consume streaming data & create live dashboard. )
##########################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 26-Dec-2020 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
##########################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row - self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = -1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = -1
curves = 'linear smooth step stepAfter stepBefore'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def create_dashboard(update_freq=0.0):
page = site['/dashboard_st']
# Fetching the data
client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[-1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
df_conv['default_rank'] = df_conv.groupby(['Currency']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
print('Rank DF Unique:')
print(df_unique)
count_row = df_unique.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(df_conv)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row - start_pos)
split_df = df_unique.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Currency'])
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', -15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[-1] = [cat, val]
page.save()
create_dashboard(update_freq=0.25)
Some of the key snippets from the above codes are –
The above snippet will create a series of data out of a pandas data frame. It will consume, one-by-one record & then pass it to the dashboard for real-time updates.
# Fetching the data
client = AblyRest('XXXXX.YYYYYY:94384jjdhdh98kiidLO')
channel = client.channels.get('sd_channel')
message_page = channel.history()
In the above code, the application will consume the real-time data out of Ably’s channel.
In the above code, the application is uniquely identifying the first instance of currency entries, which will be passed to the initial dashboard page before consuming the array of updates.
f = CategoricalSeries(df_conv)
In the above code, the application is creating an instance of the intended categorical series.
The above code is a standard way to bind the streaming data with the H2O-Wave dashboard.
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[-1] = [cat, val]
page.save()
Here are the last few snippet lines that will capture the continuous streaming data & keep updating the numbers on your dashboard.
Since I’ve already provided the run video of my application, here are a few important screens –
Case 1:
Wave Server Start Command
Case 2:
Publishing stream data
Case 3:
Consuming Stream Data & Publishing to Dashboard
Case 4:
Dashboard Data
So, finally, we have done it.
You will get the complete codebase in the following Github link.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.
Here is the post as to how to call this Dnpr library & what are the current limitations of this library.
Before we start let’s post the calling script & then explain how we can use them –
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Sep-2019 ####
#### ####
#### Objective: Main calling scripts. ####
##############################################
from dnpr.clsDnpr import clsDnpr
import datetime as dt
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
def main():
try:
srcJson = [
{"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
{"FirstName": "Satyaki", "LastName": "De", "Sal": 1000},
{"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
{"FirstName": "Archi", "LastName": "Bose", "Sal": 7000},
{"FirstName": "Deb", "LastName": "Sen", "Sal": 9500}
]
print("=" * 157)
print("Checking distinct function!")
print("=" * 157)
print()
print("*" * 157)
print("Input Data: ")
srcJsonFormat = json.dumps(srcJson, indent=1)
print(str(srcJsonFormat))
print("*" * 157)
# Initializing the class
t = clsDnpr()
print("1. Checking distinct functionality!")
var1 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var1))
# Invoking the distinct function
tarJson = t.distinct(srcJson)
print("*" * 157)
print("Output Data: ")
tarJsonFormat = json.dumps(tarJson, indent=1)
print(str(tarJsonFormat))
print("*" * 157)
if not tarJson:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var2 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var2))
print("=" * 157)
print("End of distinct function!")
print("=" * 157)
print("2. Checking nvl functionality!")
srcJson_1 = [
{"FirstName": "Satyaki", "LastName": "", "Sal": 1000},
{"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
{"FirstName": "Deb", "LastName": "", "Sal": 9500}
]
var3 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var3))
strDef = 'FNU'
print("Default Value: ", strDef)
srcColName = 'LastName'
print('Candidate Column for NVL: ', srcColName)
# Invoking the nvl function
tarJson_1 = t.nvl(srcJson_1, srcColName, strDef)
print("*" * 157)
print("Output Data: ")
tarJsonFormat_1 = json.dumps(tarJson_1, indent=1)
print(str(tarJsonFormat_1))
print("*" * 157)
if not tarJson_1:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var4 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var4))
print("=" * 157)
print("End of nvl function!")
print("=" * 157)
print("3. Checking partition-by functionality!")
srcJson_2 = [
{"FirstName": "Satyaki", "LastName": "", "Sal": 1000},
{"FirstName": "Satyaki", "LastName": "", "Sal": 700},
{"FirstName": "Archi", "LastName": "Bose", "Sal": 500},
{"FirstName": "Deb", "LastName": "", "Sal": 9500},
{"FirstName": "Archi", "LastName": "Bose", "Sal": 4500},
]
var5 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var5))
GrList = ['FirstName', 'LastName']
print("Partition By Columns::: ", str(GrList))
grOperation = 'Max'
print('Operation toe be performed: ', grOperation)
strCandidateColumnName = 'Sal'
print('Column Name on which the aggregate function will take place: ', strCandidateColumnName)
# Invoking the partition by function - MAX
tarJson_1 = t.partitionBy(srcJson_2, GrList, grOperation, strCandidateColumnName)
print("*" * 157)
print("Output Data: ")
tarJsonFormat_1 = json.dumps(tarJson_1, indent=1)
print(str(tarJsonFormat_1))
print("*" * 157)
if not tarJson_1:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var6 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var6))
var7 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var7))
grOperation_1 = 'Min'
print('Operation toe be performed: ', grOperation_1)
# Invoking the Partition By function - MIN
tarJson_2 = t.partitionBy(srcJson_2, GrList, grOperation_1, strCandidateColumnName)
print("*" * 157)
print("Output Data: ")
tarJsonFormat_2 = json.dumps(tarJson_2, indent=1)
print(str(tarJsonFormat_2))
print("*" * 157)
if not tarJson_2:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var8 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var8))
var9 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var9))
grOperation_2 = 'Avg'
print('Operation toe be performed: ', grOperation_2)
# Invoking the Partition By function - Avg
tarJson_3 = t.partitionBy(srcJson_2, GrList, grOperation_2, strCandidateColumnName)
print("*" * 157)
print("Output Data: ")
tarJsonFormat_3 = json.dumps(tarJson_3, indent=1)
print(str(tarJsonFormat_3))
print("*" * 157)
if not tarJson_3:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var10 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var10))
var11 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var11))
grOperation_3 = 'Sum'
print('Operation toe be performed: ', grOperation_3)
# Invoking the Partition By function - Sum
tarJson_4 = t.partitionBy(srcJson_2, GrList, grOperation_3, strCandidateColumnName)
print("*" * 157)
print("Output Data: ")
tarJsonFormat_4 = json.dumps(tarJson_4, indent=1)
print(str(tarJsonFormat_4))
print("*" * 157)
if not tarJson_4:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var12 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var12))
print("=" * 157)
print("End of partition function!")
print("=" * 157)
print("4. Checking regular expression functionality!")
print()
var13 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var13))
print('::Function Regex_Like:: ')
print()
tarColumn = 'FirstName'
print('Target Column for Rexex_Like: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
# Invoking the regex_like function
tarJson = t.regex_like(srcJson, tarColumn, inpPattern)
print('End of Function Regex_Like!')
print()
print("*" * 157)
print("Output Data: ")
tarJsonFormat = json.dumps(tarJson, indent=1)
print(str(tarJsonFormat))
print("*" * 157)
if not tarJson:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var14 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var14))
var15 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var15))
print('::Function Regex_Replace:: ')
print()
tarColumn = 'FirstName'
print('Target Column for Rexex_Replace: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
replaceString = 'Ka'
print('Replacing Character: ', replaceString)
# Invoking the regex_replace function
tarJson = t.regex_replace(srcJson, tarColumn, inpPattern, replaceString)
print('End of Function Rexex_Replace!')
print()
print("*" * 157)
print("Output Data: ")
tarJsonFormat = json.dumps(tarJson, indent=1)
print(str(tarJsonFormat))
print("*" * 157)
if not tarJson:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var16 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var16))
var17 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("Start Time: ", str(var17))
print('::Function Regex_Substr:: ')
print()
tarColumn = 'FirstName'
print('Target Column for Regex_Substr: ', tarColumn)
inpPattern = r"\bSa"
print('Input Pattern: ', str(inpPattern))
# Invoking the regex_substr function
tarJson = t.regex_substr(srcJson, tarColumn, inpPattern)
print('End of Function Regex_Substr!')
print()
print("*" * 157)
print("Output Data: ")
tarJsonFormat = json.dumps(tarJson, indent=1)
print(str(tarJsonFormat))
print("*" * 157)
if not tarJson:
print()
print("No relevant output data!")
print("*" * 157)
else:
print()
print("Relevant output data comes!")
print("*" * 157)
var18 = dt.datetime.now().strftime("%Y-%m-%d %H-%M-%S")
print("End Time: ", str(var18))
print("=" * 157)
print("End of regular expression function!")
print("=" * 157)
except ValueError:
print("No relevant data to proceed!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()
Let’s explain the key lines –
As of now, the source payload that it will support is mostly simple JSON.
As you can see, we’ve relatively started with the simple JSON containing an array of elements.
# Initializing the class
t = clsDnpr()
In this line, you can initiate the main library.
Let’s explore the different functions, which you can use on JSON.
1. Distinct:
Let’s discuss the distinct function on JSON. This function can be extremely useful if you use NoSQL, which doesn’t offer any distinct features. Or, if you are dealing with or expecting your source with duplicate JSON inputs.
Let’s check our sample payload for distinct –
Here is the basic syntax & argument that it is expecting –
distinct(Input Json) returnOutput Json
So, all you have to ensure that you are passing a JSON input string.
As per our example –
# Invoking the distinct function tarJson = t.distinct(srcJson)
And, here is the output –
If you compare the source JSON. You would have noticed that there are two identical entries with the name “Satyaki” is now replaced by one unique entries.
Limitation: Currently, this will support only basic JSON. However, I’m working on it to support that much more complex hierarchical JSON in coming days.
2. NVL:
NVL is another feature that I guess platform like JSON should have. So, I built this library specially handles the NULL data scenario, where the developer may want to pass a default value in place of NULL.
Hence, the implementation of this function.
Here is the sample payload for this –
In this case, if there is some business logic that requires null values replaced with some default value for LastName say e.g. FNU. This function will help you to implement that logic.
Here is the basic syntax & argument that it is expecting –
nvl(
Input Json,
Prospective Null Column Name,
Dafult Value in case of Null
) return Output Json
# Invoking the nvl function tarJson_1 = t.nvl(srcJson_1, srcColName, strDef)
So, in the above lines, this code will replace the Null value with the “FNU” for the column LastName.
And, Here is the output –
3. Partition_By:
I personally like this function as this gives more power to manipulate any data in JSON levels such as Avg, Min, Max or Sum. This might be very useful in order to implement some basic aggregation on the fly.
Here is the basic syntax & argument that it is expecting –
partition_by(
Input Json,
Group By Column List,
Group By Operation,
Candidate Column Name,
Output Column Name
) return Output Json
Now, we would explore the sample payload for all these functions to test –
Case 1:
In this case, we’ll calculate the maximum salary against FirstName & LastName. However, I want to print the Location in my final JSON output.
So, if you see the sample data & let’s make it tabular for better understanding –
So, as per our business logic, our MAX aggregate would operate only on FirstName & LastName. Hence, the calculation will process accordingly.
In that case, the output will look something like –
As you can see, from the above picture two things happen. It will remove any duplicate entries. In this case, Satyaki has exactly two identical rows. So, it removes one. However, as part of partition by clause, it keeps two entries of Archi as the location is different. Deb will be appearing once as expected.
Let’s run our application & find out the output –
So, we meet our expectation.
Case 2:
Same, logic will be applicable for Min as well.
Hence, as per the table, we should expect the output as –
And, the output of our application run is –
So, this also come as expected.
Case 3:
Let’s check for average –
The only thing I wanted to point out, as we’ve two separate entries for Satyaki. So, the average will contain the salary from both the value as rightfully so. Hence, the average of (1000+700)/2 = 850.
Let’s run our application –
So, we’ve achieved our target.
Case 4:
Let’s check for Sum.
Now, let’s run our application –
In the next installment, we’ll be discussing the last function from this package i.e. Regular Expression in JSON.
I hope, you’ll like this presentation.
Let me know – if you find any special bug. I’ll look into that.
Till then – Happy Avenging!
Note: All the data posted here are representational data & available over the internet & for educational purpose only.
Please find the link of the PyPI package of new enhanced JSON library on Python. This is particularly very useful as I’ve accommodated the following features into it.
distinct
nvl
partition_by
regex_like
regex_replace
regex_substr
All these functions can be used over JSON payload through python. I’ll discuss this in details in my next blog post.
However, I would like to suggest this library that will be handy for NoSQL databases like Cosmos DB. Now, you can quickly implement many of these features such as distinct, partitioning & regular expressions with less effort.
Today, we’ll be discussing a preview features from Microsoft Azure. Building an Azure function using Python on it’s Linux/Ubuntu VM. Since this is a preview feature, we cannot implement this to production till now. However, my example definitely has more detailed steps & complete code guide compared to whatever available over the internet.
In this post, I will take one of my old posts & enhance it as per this post. Hence, I’ll post those modified scripts. However, I won’t discuss the logic in details as most of these scripts have cosmetic changes to cater to this requirement.
In this post, we’ll only show Ubuntu run & there won’t be Windows or MAC comparison.
Initial Environment Preparation:
Set-up new virtual machine on Azure.
Set-up Azure function environments on that server.
Set-up new virtual machine on Azure:
I’m not going into the details of how to create Ubuntu VM on Microsoft Azure. You can refer the steps in more information here.
After successful creation, the VM will look like this –
Detailed information you can get after clicking this hyperlink over the name of the VM.
You have to open port 7071 for application testing from the local using postman.
You can get it from the network option under VM as follows –
Make sure that you are restricting these ports to specific network & not open to ALL traffic.
So, your VM is ready now.
To update Azure CLI, you need to use the following commands –
Set-up Azure function environments on that server:
To set-up the environment, you don’t have to go for Python installation as by default Ubuntu in Microsoft Azure comes up with desired Python version, i.e., Python3.6. However, to run the python application, you need to install the following app –
Microsoft SDK. You will get the details from this link.
Installing node-js. You will get the details from this link.
You need to install a docker. However, as per Microsoft official version, this is not required. But, you can create a Docker container to distribute the python function in Azure application. I would say you can install this just in case if you want to continue with this approach. You will get the details over here. If you want to know details about the Docker. And, how you want to integrate python application. You can refer to this link.
Creating an Azure function template on Ubuntu. The essential detail you’ll get it from here. However, over there, it was not shown in detailed steps of python packages & how you can add all the dependencies to publish it in details. It was an excellent post to start-up your knowledge.
Let’s see these components status & very brief details –
Microsoft SDK:
To check the dot net version. You need to type the following commands in Ubuntu –
dotnet –info
And, the output will look like this –
Node-Js:
Following is the way to verify your node-js version & details –
node -v
npm -v
And, the output looks like this –
Docker:
Following is the way to test your docker version –
docker -v
And, the output will look like this –
Python Packages:
Following are the python packages that we need to run & publish that in Azure cloud as an Azure function –
You must be wondered that why have I used this grep commands here. I’ve witnessed that on many occassion in Microsoft Azure’s Linux VM it produces one broken package called resource=0.0.0, which will terminate the deployment process. Hence, this is very crucial to eliminate those broken packages.
Now, we’re ready for our python scripts. But, before that, let’s see the directory structure over here –
Creating an Azure Function Template on Ubuntu:
Before we post our python scripts, we’ll create these following components, which is essential for our Python-based Azure function –
Creating a group:
Creating a group either through Azure CLI or using a docker, you can proceed. The commands for Azure CLI is as follows –
az group create –name “rndWestUSGrp” –location westus
I’m sure. You don’t want to face that again. And, here is the output –
Note that, here I haven’t used the double-quotes. But, to avoid any unforeseen issues – you should use double-quotes. You can refer the docker command from the above link, which I’ve shared earlier.
Now, you need to create one storage account where the metadata information of your function will be stored. You will create that as follows –
And, the final content of these two files (excluding the requirements.txt) will look like this –
Finally, we’ll create the template function by this following command –
func new
This will follow with steps finish it. You need to choose Python as your programing language. You need to choose an HTTP trigger template. Once you created that successfully, you’ll see the following files –
Note that, our initial function name is -> getVal.
By default, Azure will generate some default code inside the __init__.py. The details of those two files can be found here.
Since we’re ready with our environment setup. We can now discuss our Python scripts –
1. clsConfigServer.py (This script contains all the parameters of the server.)
2. clsEnDec.py (This script is a lighter version of encryption & decryption of our previously discussed scenario. Hence, we won’t discuss in details. You can refer my earlier post to understand the logic of this script.)
###########################################
#### Written By: SATYAKI DE ########
#### Written On: 25-Jan-2019 ########
#### Package Cryptography needs to ########
#### install in order to run this ########
#### script. ########
#### ########
#### Objective: This script will ########
#### encrypt/decrypt based on the ########
#### hidden supplied salt value. ########
###########################################
from cryptography.fernet import Fernet
import logging
from getVal.clsConfigServer import clsConfigServer as csf
class clsEnDec(object):
def __init__(self):
# Calculating Key
self.token = str(csf.config['DEF_SALT'])
def encrypt_str(self, data, token):
try:
# Capturing the Salt Information
t1 = self.token
t2 = token
if t2 == '':
salt = t1
else:
salt = t2
logging.info("Encrypting the value!")
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
encr_val = str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
strV1 = "Encrypted value:: " + str(encr_val)
logging.info(strV1)
return encr_val
except Exception as e:
x = str(e)
print(x)
encr_val = ''
return encr_val
def decrypt_str(self, data, token):
try:
# Capturing the Salt Information
t1 = self.token
t2 = token
if t2 == '':
salt = t1
else:
salt = t2
logging.info("Decrypting the value!")
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
decr_val = str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
strV2 = "Decrypted value:: " + str(decr_val)
logging.info(strV2)
return decr_val
except Exception as e:
x = str(e)
print(x)
decr_val = ''
return decr_val
3. clsFlask.py (This is the main server script that will the encrypt/decrypt class from our previous scenario. This script will capture the requested JSON from the client, who posted from the clients like another python script or third-party tools like Postman.)
###########################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Jan-2019 ####
#### Package Flask package needs to ####
#### install in order to run this ####
#### script. ####
#### ####
#### Objective: This script will ####
#### encrypt/decrypt based on the ####
#### supplied salt value. Also, ####
#### this will capture the individual ####
#### element & stored them into JSON ####
#### variables using flask framework. ####
###########################################
from getVal.clsConfigServer import clsConfigServer as csf
from getVal.clsEnDec import clsEnDecAuth
getVal = clsEnDec()
import logging
class clsFlask(object):
def __init__(self):
self.xtoken = str(csf.config['DEF_SALT'])
def getEncryptProcess(self, dGroup, input_data, dTemplate):
try:
# It is sending default salt value
xtoken = self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elements
if ((dGroup != '') & (dTemplate != '')):
# Based on the Group & Element it will fetch the salt
# Based on the specific salt it will encrypt the data
if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
xtoken = str(csf.config['ACCT_NBR_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.encrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
xtoken = str(csf.config['NAME_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.encrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
xtoken = str(csf.config['PHONE_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.encrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
xtoken = str(csf.config['EMAIL_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.encrypt_str(input_data, xtoken)
else:
ret_val = ''
else:
ret_val = ''
# Return value
return ret_val
except Exception as e:
ret_val = ''
# Return the valid json Error Response
return ret_val
def getDecryptProcess(self, dGroup, input_data, dTemplate):
try:
xtoken = self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elements
if ((dGroup != '') & (dTemplate != '')):
# Based on the Group & Element it will fetch the salt
# Based on the specific salt it will decrypt the data
if ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')):
xtoken = str(csf.config['ACCT_NBR_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.decrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')):
xtoken = str(csf.config['NAME_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.decrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')):
xtoken = str(csf.config['PHONE_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.decrypt_str(input_data, xtoken)
elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')):
xtoken = str(csf.config['EMAIL_SALT'])
strV1 = "xtoken: " + str(xtoken)
logging.info(strV1)
strV2 = "Flask Input Data: " + str(input_data)
logging.info(strV2)
#x = cen.clsEnDecAuth()
ret_val = getVal.decrypt_str(input_data, xtoken)
else:
ret_val = ''
else:
ret_val = ''
# Return value
return ret_val
except Exception as e:
ret_val = ''
# Return the valid Error Response
return ret_val
4. __init__.py (This autogenerated script contains the primary calling methods of encryption & decryption based on the element header & values after enhanced as per the functionality.)
###########################################
#### Written By: SATYAKI DE ####
#### Written On: 08-Jun-2019 ####
#### Package Flask package needs to ####
#### install in order to run this ####
#### script. ####
#### ####
#### Objective: Main Calling scripts. ####
#### This is an autogenrate scripts. ####
#### However, to meet the functionality####
#### we've enhanced as per our logic. ####
###########################################
__all__ = ['clsFlask']
import logging
import azure.functions as func
import json
from getVal.clsFlask import clsFlask
getVal = clsFlask()
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Python Encryption function processed a request.')
str_val = 'Input Payload:: ' + str(req.get_json())
str_1 = str(req.get_json())
logging.info(str_val)
ret_val = {}
DataIn = ''
dGroup = ''
dTemplate = ''
flg = ''
if (str_1 != ''):
try:
req_body = req.get_json()
dGroup = req_body.get('dataGroup')
try:
DataIn = req_body.get('data')
strV15 = 'If Part:: ' + str(DataIn)
logging.info(strV15)
if ((DataIn == '') | (DataIn == None)):
raise ValueError
flg = 'Y'
except ValueError:
DataIn = req_body.get('edata')
strV15 = 'Else Part:: ' + str(DataIn)
logging.info(strV15)
flg = 'N'
except:
DataIn = req_body.get('edata')
strV15 = 'Else Part:: ' + str(DataIn)
logging.info(strV15)
flg = 'N'
dTemplate = req_body.get('dataTemplate')
except ValueError:
pass
strV5 = "Encrypt Decrypt Flag:: " + flg
logging.info(strV5)
if (flg == 'Y'):
if ((DataIn != '') & ((dGroup != '') & (dTemplate != ''))):
logging.info("Encryption Started!")
ret_val = getVal.getEncryptProcess(dGroup, DataIn, dTemplate)
strVal2 = 'Return Payload:: ' + str(ret_val)
logging.info(strVal2)
xval = json.dumps(ret_val)
return func.HttpResponse(xval)
else:
return func.HttpResponse(
"Please pass a data in the request body",
status_code=400
)
else:
if ((DataIn != '') & ((dGroup != '') & (dTemplate != ''))):
logging.info("Decryption Started!")
ret_val2 = getVal.getDecryptProcess(dGroup, DataIn, dTemplate)
strVal3 = 'Return Payload:: ' + str(ret_val)
logging.info(strVal3)
xval1 = json.dumps(ret_val2)
return func.HttpResponse(xval1)
else:
return func.HttpResponse(
"Please pass a data in the request body",
status_code=400
)
In this script, based on the value of an flg variable, we’re calling our encryption or decryption methods. And, the value of the flg variable is set based on the following logic –
So, if the application gets the “data” element then – it will consider the data needs to be encrypted; otherwise, it will go for decryption. And, based on that – it is setting the value.
Now, we’re ready to locally run our application –
func host start
And, the output will look like this –
Let’s test it from postman –
Encrypt:
Decrypt:
Great. Now, we’re ready to publish this application to Azure cloud.
As in our earlier steps, we’ve already built our storage account for the metadata. Please scroll to top to view that again. Now, using that information, we’ll make the function app with a more meaningful name –
az functionapp create –resource-group rndWestUSGrp –os-type Linux \ –consumption-plan-location westus –runtime python \ –name getEncryptDecrypt –storage-account cryptpy2019
On many occassion, without the use of “–build-native-deps” might leads to failure. Hence, I’ve added that to avoid such scenarios.
Now, we need to test our first published complex Azure function with Python through postman –
Encrypt:
Decrypt:
Wonderful! So, it is working.
You can see the function under the Azure portal –
Let’s see some other important features of this function –
Monitor: You can monitor two ways. One is by clicking the monitor options you will get the individual requests level details & also get to see the log information over here –
Clicking Application Insights will give you another level of detailed logs, which can be very useful for debugging. We’ll touch this at the end of this post with a very brief discussion.
As you can see, clicking individual lines will show the details further.
Let’s quickly check the application insights –
Application Insights will give you a SQL like an interface where you can get the log details of all your requests.
You can expand the individual details for further information.
You can change the parameter name & other details & click the run button to get all the log details for your debugging purpose.
So, finally, we’ve achieved our goal. This is relatively long posts. But, I’m sure this will help you to create your first python-based function on the Azure platform.
Hope, you will like this approach. Let me know your comment on the same.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data posted here are representational data & available over the internet.
Here is the latest installment from the Python verse. For the first time, we’ll be dealing with Python with Azure cloud along with the help from Pandas & json.
Why post on this topic?
I always try to post something based on some kind of used cases, which might be useful in real-life scenarios. And, on top of that, I really don’t find significant posts on Azure dealing with Python. So, thought of sharing some first used cases, which will encourage others to join this club & used more python based application in the Azure platform.
First, let us check the complexity of today’s post & our objective.
What is the objective?
Today, our objective is to load a couple of json payload & stored them into multiple Cosmos Containers & finally fetch the data from the Cosmos DB & store the output into our log files apart from printing the same over the terminal screen.
Before we start discussing our post, let us explain some basic terminology of Azure Cosmos DB. So, that, next time whenever we refer them, it will be easier for you to understand those terminologies.
Learning basic azure terminology.
Since this is an unstructured DB, all the data will be stored in this following fashion –
Azure Cosmos DB -> Container -> Items
Let’s simplify this in words. So, each azure DB may have multiple containers, which you can compare with the table of any conventional RDBMS. And, under containers, you will have multiple items, which represents rows of an RDBMS table. The only difference is in each item you might have a different number of elements, which is equivalent to the columns in traditional RDBMS tables. The traditional table always has a fixed number of columns.
Input Payload:
Let’s review three different payloads, which we’ll be loading into three separate containers.
srcEmail.json
As you can see in the items, first sub-row has 3 elements, whereas the second one has 4 components. Traditional RDBMS, the table will always have the same number of columns.
srcTwitter.json
srcHR.json
So, from the above three sample payload, our application will try to put user’s feedback & consolidate at a single place for better product forecasts.
Azure Portal:
Let’s look into the Azure portal & we’ll be identifying a couple of crucial information, which will require in python scripts for authentication. But, before that, I’ll show – how to get those details in steps –
As shown highlighted in Red, click the Azure Cosmos DB. You will find the following screen –
If you click this, you will find all the collections/containers that are part of the same DB as follows –
After, that we’ll be trying to extract the COSMOS Key & the Endpoint/URI from the portal. Without this, python application won’t be able to interact with the Azure portal. This is sensitive information. So, I’ll be providing some dummy details here just to show how to extract it. Never share these details with anyone outside of your project or group.
Good. Now, we’re ready for python scripts.
Python Scripts:
In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –
clsL.py
So, I’m not going to discuss these scripts.
Before we discuss our scripts, let’s look out the directory structures –
1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### azure cosmos db. Application will ####
#### process these information & perform ####
#### various CRUD operation on Cosmos DB. ####
##############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
db_name = 'rnd-de01-usw2-vfa-cdb'
db_link = 'dbs/' + db_name
CONTAINER1 = "RealtimeEmail"
CONTAINER2 = "RealtimeTwitterFeedback"
CONTAINER3 = "RealtimeHR"
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'COSMOSDB': db_name,
'COSMOS_CONTAINER1': CONTAINER1,
'COSMOS_CONTAINER2': CONTAINER2,
'COSMOS_CONTAINER3': CONTAINER3,
'CONFIG_ORIG': 'Config_orig.csv',
'ENCRYPT_CSV': 'Encrypt_Config.csv',
'DECRYPT_CSV': 'Decrypt_Config.csv',
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'APP_DESC_1': 'Feedback Communication',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty FROM RealtimeEmail c",
'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
'DB_QRY': "SELECT * FROM c",
'COLLECTION_QRY': "SELECT * FROM r",
'database_link': db_link,
'collection_link_1': db_link + '/colls/' + CONTAINER1,
'collection_link_2': db_link + '/colls/' + CONTAINER2,
'collection_link_3': db_link + '/colls/' + CONTAINER3,
'options': {
'offerThroughput': 1000,
'enableCrossPartitionQuery': True,
'maxItemCount': 2
}
}
2. clsCosmosDBDet (This script will test the necessary connection with the Azure cosmos DB from the python application. And, if it is successful, then it will fetch all the collection/containers details, which resided under the same DB. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This script will check & ####
#### test the connection with the Cosmos ####
#### & it will fetch all the collection ####
#### name resied under the same DB. ####
##############################################
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
from clsConfig import clsConfig as cf
class IDisposable(cosmos_client.CosmosClient):
def __init__(self, obj):
self.obj = obj
def __enter__(self):
return self.obj
def __exit__(self, exception_type, exception_val, trace):
self = None
class clsCosmosDBDet:
def __init__(self):
self.endpoint = cf.config['COSMOSDB_ENDPOINT']
self.primarykey = cf.config['COSMOS_PRIMARYKEY']
self.db = cf.config['COSMOSDB']
self.cont_1 = cf.config['COSMOS_CONTAINER1']
self.cont_2 = cf.config['COSMOS_CONTAINER2']
self.cont_3 = cf.config['COSMOS_CONTAINER3']
self.database_link = cf.config['database_link']
self.collection_link_1 = cf.config['collection_link_1']
self.collection_link_2 = cf.config['collection_link_2']
self.collection_link_3 = cf.config['collection_link_3']
self.options = cf.config['options']
self.db_qry = cf.config['DB_QRY']
self.collection_qry = cf.config['COLLECTION_QRY']
def list_Containers(self, client):
try:
database_link = self.database_link
collection_qry = self.collection_qry
print("1. Query for collection!")
print()
collections = list(client.QueryContainers(database_link, {"query": collection_qry}))
if not collections:
return
for collection in collections:
print(collection['id'])
print()
except errors.HTTPFailure as e:
if e.status_code == 404:
print("*" * 157)
print('A collection with id \'{0}\' does not exist'.format(id))
print("*" * 157)
else:
raise errors.HTTPFailure(e.status_code)
def test_db_con(self):
endpoint = self.endpoint
primarykey = self.primarykey
options_1 = self.options
db_qry = self.db_qry
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
try:
options = {}
query = {"query": db_qry}
options = options_1
print("-" * 157)
print('Options:: ', options)
print()
print("Database details:: ")
result_iterable = client.QueryDatabases(query, options)
for item in iter(result_iterable):
print(item)
print("-" * 157)
except errors.HTTPFailure as e:
if e.status_code == 409:
pass
else:
raise errors.HTTPFailure(e.status_code)
self.list_Containers(client)
return 0
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.message))
return 1
finally:
print("Application successfully completed!")
Key lines from the above scripts are –
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
In this step, the python application is building the connection object.
# Refer the entry in our config file
self.db_qry = cf.config['DB_QRY']..query = {"query": db_qry}options = options_1..
Based on the supplied value from our configuration python script, this will extract the cosmos DB information.
self.list_Containers(client)
This is a function that will identify all the collection under this DB.
def list_Containers(self, client):
..
collections = list(client.QueryContainers(database_link, {"query": collection_qry}))if not collections: returnfor collection in collections: print(collection['id'])
In these above lines, our application will actually fetch the containers that are associated with this DB.
3. clsColMgmt.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
################################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This scripts has multiple ####
#### features. You can create new items ####
#### in azure cosmos db. Apart from that ####
#### you can retrieve data from Cosmos just ####
#### for viewing purpose. You can display ####
#### data based on specific filters or the ####
#### entire dataset. Hence, three different ####
#### methods provided here to support this. ####
################################################
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import pandas as p
import json
from clsConfig import clsConfig as cf
class IDisposable(cosmos_client.CosmosClient):
def __init__(self, obj):
self.obj = obj
def __enter__(self):
return self.obj
def __exit__(self, exception_type, exception_val, trace):
self = None
class clsColMgmt:
def __init__(self):
self.endpoint = cf.config['COSMOSDB_ENDPOINT']
self.primarykey = cf.config['COSMOS_PRIMARYKEY']
self.db = cf.config['COSMOSDB']
self.cont_1 = cf.config['COSMOS_CONTAINER1']
self.cont_2 = cf.config['COSMOS_CONTAINER2']
self.cont_3 = cf.config['COSMOS_CONTAINER3']
self.database_link = cf.config['database_link']
self.collection_link_1 = cf.config['collection_link_1']
self.collection_link_2 = cf.config['collection_link_2']
self.collection_link_3 = cf.config['collection_link_3']
self.options = cf.config['options']
self.db_qry = cf.config['DB_QRY']
self.collection_qry = cf.config['COLLECTION_QRY']
# Creating cosmos items in container
def CreateDocuments(self, inputJson, collection_flg = 1):
try:
# Declaring variable
endpoint = self.endpoint
primarykey = self.primarykey
print('Creating Documents')
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
if collection_flg == 1:
collection_link = self.collection_link_1
elif collection_flg == 2:
collection_link = self.collection_link_2
else:
collection_link = self.collection_link_3
container = client.ReadContainer(collection_link)
# Create a SalesOrder object. This object has nested properties and various types including numbers, DateTimes and strings.
# This can be saved as JSON as is without converting into rows/columns.
print('Input Json:: ', str(inputJson))
nSon = json.dumps(inputJson)
json_rec = json.loads(nSon)
client.CreateItem(container['_self'], json_rec)
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.status_code))
finally:
print("Application successfully completed!")
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def CosmosDBCustomQuery_PandasCSVWithParam(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
try:
# Reading data by SQL & convert it ot Pandas Dataframe
results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
cnt = 0
dfSrc = p.DataFrame()
dfRes = p.DataFrame()
dfSrc2 = p.DataFrame()
json_data = ''
for doc in results:
cnt += 1
dfSrc = p.io.json.json_normalize(results)
dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
dfRes = dfSrc
print("Total records fetched: ", cnt)
print("*" * 157)
return dfRes
except errors.HTTPFailure as e:
Df_Fin = p.DataFrame()
if e.status_code == 404:
print("*" *157)
print("Document doesn't exists")
print("*" *157)
return Df_Fin
elif e.status_code == 400:
print("*" * 157)
print("Bad request exception occuered: ", e)
print("*" *157)
return Df_Fin
else:
return Df_Fin
finally:
print()
def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
try:
# Reading data by SQL & convert it ot Pandas Dataframe
results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
cnt = 0
dfSrc = p.DataFrame()
dfRes = p.DataFrame()
dfSrc2 = p.DataFrame()
json_data = ''
for doc in results:
cnt += 1
dfSrc = p.io.json.json_normalize(results)
dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
dfRes = dfSrc
print("Total records fetched: ", cnt)
print("*" * 157)
return dfRes
except errors.HTTPFailure as e:
Df_Fin = p.DataFrame()
if e.status_code == 404:
print("*" *157)
print("Document doesn't exists")
print("*" *157)
return Df_Fin
elif e.status_code == 400:
print("*" * 157)
print("Bad request exception occuered: ", e)
print("*" *157)
return Df_Fin
else:
return Df_Fin
finally:
print()
def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]):
endpoint = self.endpoint
primarykey = self.primarykey
options_1 = self.options
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
if collection_flg == 1:
collection_link = self.collection_link_1
elif collection_flg == 2:
collection_link = self.collection_link_2
else:
collection_link = self.collection_link_3
print("Additional parameters: ", additional_params)
message = msg
options = options_1
if additional_params == 1:
query = {"query": sql_qry}
df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options)
else:
query = {"query": sql_qry, "parameters": param_det}
df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options)
return df_Fin
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.message))
finally:
print("Application successfully completed!")
Key lines from the above script –
def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
This method is generic. It will fetch all the records of a cosmos container.
In this step, the application fetching the data in the form of json & then serialize them & flatten them & finally stored the result into pandas dataframe for return output. Function –
CosmosDBCustomQuery_PandasCSVWithParam
– Is the same as the previous function. The only thing it can process parameters to filter out the data.
Once, you’ll receive the input payload. The application will convert it to valid JSON payload & then send it to create item method to insert records.
4. callCosmosAPI.py (This script is the main calling function. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: Main calling scripts. ####
##############################################
import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
def main():
try:
df_ret = p.DataFrame()
df_ret_2 = p.DataFrame()
df_ret_2_Mod = p.DataFrame()
debug_ind = 'Y'
# Initiating Log Class
l = cl.clsL()
general_log_path = str(cf.config['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)
# Moving previous day log files to archive directory
arch_dir = cf.config['ARCH_DIR']
log_dir = cf.config['LOG_PATH']
print("Archive Directory:: ", arch_dir)
print("Log Directory::", log_dir)
print("*" * 157)
print("Testing COSMOS DB Connection!")
print("*" * 157)
# Checking Cosmos DB Azure
y = cmdb.clsCosmosDBDet()
ret_val = y.test_db_con()
if ret_val == 0:
print()
print("Cosmos DB Connection Successful!")
print("*" * 157)
else:
print()
print("Cosmos DB Connection Failure!")
print("*" * 157)
raise Exception
print("*" * 157)
# Creating Data in Cosmos DB
print()
print('Fetching data from Json!')
print('Creating data for Email..')
print("-" * 157)
emailFile = cf.config['EMAIL_SRC_JSON_FILE']
flg = 1
with open(emailFile) as json_file:
dataEmail = json.load(json_file)
# Creating documents
a1 = cm.clsColMgmt()
ret_cr_val1 = a1.CreateDocuments(dataEmail, flg)
if ret_cr_val1 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
print()
print('Creating data for Twitter..')
print("-" * 157)
twitFile = cf.config['TWITTER_SRC_JSON_FILE']
flg = 2
with open(twitFile) as json_file:
dataTwitter = json.load(json_file)
# Creating documents
a2 = cm.clsColMgmt()
ret_cr_val2 = a2.CreateDocuments(dataTwitter, flg)
if ret_cr_val2 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
print()
print('Creating data for HR..')
print("-" * 157)
hrFile = cf.config['HR_SRC_JSON_FILE']
flg = 3
with open(hrFile) as json_file:
hrTwitter = json.load(json_file)
# Creating documents
a3 = cm.clsColMgmt()
ret_cr_val3 = a3.CreateDocuments(hrTwitter, flg)
if ret_cr_val3 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
# Calling the function 1
print("RealtimeEmail::")
# Fetching First collection data to dataframe
print("Fethcing Comos Collection Data!")
sql_qry_1 = cf.config['SQL_QRY_1']
msg = "Documents generatd based on unique key"
collection_flg = 1
x = cm.clsColMgmt()
df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)
l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('RealtimeEmail Data::')
print(df_ret)
print()
# Checking execution status
ret_val = int(df_ret.shape[0])
if ret_val == 0:
print("Cosmos DB Hans't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfully fetched!")
print("*" * 157)
# Calling the 2nd Collection
print("RealtimeTwitterFeedback::")
# Fetching First collection data to dataframe
print("Fethcing Cosmos Collection Data!")
# Query using parameters
sql_qry_2 = cf.config['SQL_QRY_2']
msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
collection_flg = 2
val = 'crazyGo'
param_det = [{"name": "@CrVal", "value": val}]
add_param = 2
x1 = cm.clsColMgmt()
df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)
l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('Realtime Twitter Data:: ')
print(df_ret_2)
print()
# Checking execution status
ret_val_2 = int(df_ret_2.shape[0])
if ret_val_2 == 0:
print("Cosmos DB hasn't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfuly row feteched!")
print("*" * 157)
except ValueError:
print("No relevant data to proceed!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()
Key lines from the above script –
with open(twitFile) as json_file: dataTwitter = json.load(json_file)
Reading a json file.
val = 'crazyGo'param_det = [{"name": "@CrVal", "value": val}]add_param = 2
Passing a specific parameter value to filter out the record, while fetching it from the Cosmos DB.
Now, let’s look at the runtime stats.
Windows:
MAC:
Let’s compare the output log directory –
Windows:
MAC:
Let’s verify the data from Cosmos DB.
Here, subscriberId starting with ‘M‘ denotes data inserted from the MAC environment. Other one inserted through Windows.
Let’s see one more example from Cosmos –
So, I guess – we’ve achieved our final goal here. Successfully, inserted data into Azure Cosmos DB from the python application & retrieve it successfully.
Following python packages are required in order to run this application –
pip install azure
pip install azure-cosmos
pip install pandas
pip install requests
This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.
I hope you’ll like this effort.
Wait for the next installment. Till then, Happy Avenging. 😀
[Note: All the sample data are available/prepared in the public domain for research & study.]
Today, we’ll discuss how to improve your panda’s data processing power using Multi-threading. Note that, we are not going to use any third party python package. Also, we’ll be using a couple of python scripts, which we’ve already discussed in our previous posts. Hence, this time, I won’t post them here.
Please refer the following scripts –
a. callClient.pyb. callRunServer.pyc. clsConfigServer.pyd. clsEnDec.pye. clsFlask.pyf. clsL.pyg. clsParam.pyh. clsSerial.pyi. clsWeb.py
Please find the above scripts described here with details.
So, today, we’ll be looking into how the multi-threading really helps the application to gain some performance over others.
Let’s go through our existing old sample files –
And, we’ve four columns that are applicable for encryption. This file contains 10K records. That means the application will make 40K calls to the server for a different kind of encryption for each column.
Now, if you are going with the serial approach, which I’ve already discussed here, will take significant time for data processing. However, if we could club a few rows as one block & in this way we can create multiple blocks out of our data csv like this –
As you can see that blocks are marked with a different color. So, now if you send each block of data in parallel & send the data for encryption. Ideally, you will be able to process data much faster than the usual serial process. And, this what we would be looking for with the help of python’s multi-threading & queue. Without the queue, this program won’t be possible as the queue maintains the data & process integrity.
One more thing we would like to explain here. Whenever this application is sending the block of data. It will be posting that packed into a (key, value) dictionary randomly. Key will be the thread name. The reason, we’re not expecting data after process might arrive in some random order wrapped with the dictionary as well. Once the application received all the dictionary with dataframe with encrypted/decrypted data, the data will be rearranged based on the key & then joined back with the rest of the data.
Let’s see one sample way of sending & receiving random thread –
The left-hand side, the application is splitting the recordset into small chunks of a group. Once, those group created, using python multi-threading the application is now pushing them into the queue for the producer to produce the encrypted/decrypted value. Similar way, after processing the application will push the final product into the queue for consuming the final output.
This is the pictorial representation of dictionary ordering based on the key-value & then the application will extract the entire data to form the target csv file.
Let’s explore the script –
1. clsParallel.py (This script will consume the split csv files & send the data blocks in the form of the dictionary using multi-threading to the API for encryption in parallel. Hence, the name comes into the picture.)
As we’ve already described the logic of these methods in our previous post.
# Checking total count of rowscount_row = df_input.shape[0]print('Total number of records to process:: ', count_row)interval = int(count_row / num_worker_threads) + 1actual_worker_task = int(count_row / interval) + 1
Fetching the total number of rows from the dataframe. Based on the row count, the application will derive the actual number of threads that will be used for parallelism.
for i in range(actual_worker_task): t = threading.Thread(target=self.getEncryptWQ) t.start() threads.append(t) name = str(t.getName()) if ((start_pos + interval) < count_row): end_pos = start_pos + interval else: end_pos = start_pos + (count_row - start_pos) split_df = df_input.iloc[start_pos:end_pos] l_dict[name] = split_df if ((start_pos > count_row) | (start_pos == count_row)): break else: start_pos = start_pos + interval q.put(l_dict) cnt += 1
Here, the application is splitting the data into multiple groups of smaller data packs & then combining them into (key, value) dictionary & finally placed them into the individual queue.
# block until all tasks are doneq.join()
This will join the queue process. This will ensure that queues are free after consuming the data.
# stop workersfor i in range(actual_worker_task): c_dict['TEND'] = p.DataFrame() q.put(c_dict)for t in threads: t.join()
The above lines are essential. As this will help the process to identify that no more data are left to send at the queue. And, the main thread will wait until all the threads are done.
for k, v in fin_dict.items(): min_val_list[int(k.replace('Thread-',''))] = vmin_val = min(min_val_list, key=int)
Once, all the jobs are done. The application will find the minimum thread value & based on that we can sequence all the data chunks as explained in our previous image & finally clubbed them together to form the complete csv.
for k, v in sorted(fin_dict.items(), key=lambda k:int(k[0].replace('Thread-',''))): if int(k.replace('Thread-','')) == min_val: df_ret = fin_dict[k] else: d_frames = [df_ret, fin_dict[k]] df_ret = p.concat(d_frames)
As already explained, using the starting point of our data dictionary element, the application is clubbing the data back to the main csv.
Next method, which we’ll be explaining is –
getEncryptWQ
Please find the key lines –
while True: try: #item_dict = q.get() item_dict = q.get_nowait() for k, v in item_dict.items(): # Assigning Target File Basic Name item = str(k) if ((item == 'TEND') | (item == '')): break if ((item != 'TEND') | (item != '')): self.getEncrypt(item_dict) q.task_done() except Exception: break
This method will consume the data & processing it for encryption or decryption. This will continue to do the work until or unless it receives the key value as TEND or the queue is empty.
Let’s compare the statistics between Windows & MAC.
Let’s see the file structure first –
Windows (16 GB – Core 2) Vs Mac (10 GB – Core 2):
Windows (16 GB – Core 2):
Mac (10 GB – Core 2):
Find the complete directory from both the machine.
Windows (16 GB – Core 2):
Mac (10 GB – Core 2):
Here is the final output –
So, we’ve achieved our target goal.
Let me know – how do you like this post. Please share your suggestion & comments.
I’ll be back with another installment from the Python verse.
Today, we’ll be discussing a new cross-over between API, JSON, Encryption along with data distribution through Queue.
The primary objective here is to distribute one csv file through API service & access our previously deployed Encryption/Decryption methods by accessing the parallel call through Queue. In this case, our primary objective is to allow asynchronous calls to Queue for data distributions & at this point we’re not really looking for performance improvement. Instead, our goal to achieve the target.
My upcoming posts will discuss the improvement of performance using Parallel calls.
Let’s discuss it now.
Please find the structure of our Windows & MAC directory are as follows –
We’re not going to discuss any scripts, which we’ve already discussed in my previous posts. Please refer the relevant earlier posts from my blogs.
1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
############################################### Written By: SATYAKI DE ############ Written On: 25-Jan-2019 ############ ############ Objective: Log File ###################################################importpandasaspimportplatformasplfromclsParamimport clsParam as cf
classclsL(object):
def__init__(self):
self.path = cf.config['PATH']
deflogr(self, Filename, Ind, df, subdir=None):
try:
x = p.DataFrame()
x = df
sd = subdir
os_det = pl.system()
if sd ==None:
if os_det =="Windows":
fullFileName =self.path +'\\'+ Filename
else:
fullFileName =self.path +'/'+ Filename
else:
if os_det =="Windows":
fullFileName =self.path +'\\'+ sd +"\\"+ Filename
else:
fullFileName =self.path +'/'+ sd +"/"+ Filename
if Ind =='Y':
x.to_csv(fullFileName, index=False)
return0exceptExceptionas e:
y =str(e)
print(y)
return3
2. callRunServer.py (This script will create an instance of a server. Once, it is running – it will emulate the Server API functionalities. Hence, the name comes into the picture.)
################################################ Written By: SATYAKI DE ######## Written On: 10-Feb-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## initiate the encrypt/decrypt class ######## based on client supplied data. ######## Also, this will create an instance ######## of the server & create an endpoint ######## or API using flask framework. ################################################fromflaskimport Flask
fromflaskimport jsonify
fromflaskimport request
fromflaskimport abort
fromclsConfigServerimport clsConfigServer as csf
importclsFlaskasclf
app = Flask(__name__)
@app.route('/process/getEncrypt', methods=['POST'])
defgetEncrypt():
try:
# If the server application doesn't have# valid json, it will throw 400 errorifnot request.get_json:
abort(400)
# Capturing the individual element
content = request.get_json()
dGroup = content['dataGroup']
input_data = content['data']
dTemplate = content['dataTemplate']
# For debug purpose onlyprint("-"*157)
print("Group: ", dGroup)
print("Data: ", input_data)
print("Template: ", dTemplate)
print("-"*157)
ret_val =''if ((dGroup !='') & (dTemplate !='')):
y = clf.clsFlask()
ret_val = y.getEncryptProcess(dGroup, input_data, dTemplate)
else:
abort(500)
return jsonify({'status': 'success', 'encrypt_val': ret_val})
exceptExceptionas e:
x =str(e)
return jsonify({'status': 'error', 'detail': x})
@app.route('/process/getDecrypt', methods=['POST'])
defgetDecrypt():
try:
# If the server application doesn't have# valid json, it will throw 400 errorifnot request.get_json:
abort(400)
# Capturing the individual element
content = request.get_json()
dGroup = content['dataGroup']
input_data = content['data']
dTemplate = content['dataTemplate']
# For debug purpose onlyprint("-"*157)
print("Group: ", dGroup)
print("Data: ", input_data)
print("Template: ", dTemplate)
print("-"*157)
ret_val =''if ((dGroup !='') & (dTemplate !='')):
y = clf.clsFlask()
ret_val = y.getDecryptProcess(dGroup, input_data, dTemplate)
else:
abort(500)
return jsonify({'status': 'success', 'decrypt_val': ret_val})
exceptExceptionas e:
x =str(e)
return jsonify({'status': 'error', 'detail': x})
defmain():
try:
print('Starting Encrypt/Decrypt Application!')
# Calling Server Start-Up Script
app.run(debug=True, host=str(csf.config['HOST_IP_ADDR']))
ret_val =0if ret_val ==0:
print("Finished Returning Message!")
else:
raiseIOErrorexceptExceptionas e:
print("Server Failed To Start!")
if __name__ =='__main__':
main()
3. clsFlask.py (This script is part of the server process, which will categorize the encryption logic based on different groups. Hence, the name comes into the picture.)
############################################### Written By: SATYAKI DE ######## Written On: 25-Jan-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## encrypt/decrypt based on the ######## supplied salt value. Also, ######## this will capture the individual ######## element & stored them into JSON ######## variables using flask framework. ###############################################fromclsConfigServerimport clsConfigServer as csf
importclsEnDecAuthascenclassclsFlask(object):
def__init__(self):
self.xtoken =str(csf.config['DEF_SALT'])
defgetEncryptProcess(self, dGroup, input_data, dTemplate):
try:
# It is sending default salt value
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will encrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid json Error Responsereturn ret_val
defgetDecryptProcess(self, dGroup, input_data, dTemplate):
try:
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will decrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid Error Responsereturn ret_val
4. clsEnDec.py (This script will convert the string to encryption or decryption from its previous states based on the supplied group. Hence, the name comes into the picture.)
############################################### Written By: SATYAKI DE ############ Written On: 25-Jan-2019 ############ Package Cryptography needs to ############ install in order to run this ############ script. ############ ############ Objective: This script will ############ encrypt/decrypt based on the ############ hidden supplied salt value. ###################################################fromcryptography.fernetimport Fernet
classclsEnDec(object):
def__init__(self, token):
# Calculating Keyself.token = token
defencrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
encr_val =str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return encr_val
exceptExceptionas e:
x =str(e)
print(x)
encr_val =''return encr_val
defdecrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
decr_val =str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return decr_val
exceptExceptionas e:
x =str(e)
print(x)
decr_val =''return decr_val
5. clsConfigServer.py (This script contains all the main parameter details of your emulated API server. Hence, the name comes into the picture.)
6. clsWeb.py (This script will receive the input Pandas dataframe & then convert it to JSON & then send it back to our Flask API Server for encryption/decryption. Hence, the name comes into the picture.)
################################################ Written By: SATYAKI DE ######## Written On: 09-Mar-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## initiate API based JSON requests ######## at the server & receive the ######## response from it & transform it ######## back to the data-frame. ################################################importjsonimportrequestsimportdatetimeimporttimeimportsslimportosfromclsParamimport clsParam as cf
classclsWeb(object):
def__init__(self, payload):
self.payload = payload
self.path =str(cf.config['PATH'])
self.max_retries =int(cf.config['MAX_RETRY'])
self.encrypt_ulr =str(cf.config['ENCRYPT_URL'])
self.decrypt_ulr =str(cf.config['DECRYPT_URL'])
defgetResponse(self, mode):
# Assigning Logging Info
max_retries =self.max_retries
encrypt_ulr =self.encrypt_ulr
decrypt_ulr =self.decrypt_ulr
En_Dec_Mode = mode
try:
# Bypassing SSL Authenticationtry:
_create_unverified_https_context = ssl._create_unverified_context
exceptAttributeError:
# Legacy python that doesn't verify HTTPS certificates by defaultpasselse:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Providing the urlif En_Dec_Mode =='En':
url = encrypt_ulr
else:
url = decrypt_ulr
print("URL::", url)
# Capturing the payload
data =self.payload
# Converting String to Json# json_data = json.loads(data)
json_data = json.loads(data)
print("JSON:::::::", str(json_data))
headers = {"Content-type": "application/json"}
param = headers
var1 = datetime.datetime.now().strftime("%H:%M:%S")
print('Json Fetch Start Time:', var1)
retries =1
success =Falsewhilenot success:
# Getting response from web service# response = requests.post(url, params=param, json=data, auth=(login, password), verify=False)
response = requests.post(url, params=param, json=json_data, verify=False)
print("Complete Return Code:: ", str(response.status_code))
print("Return Code Initial::", str(response.status_code)[:1])
ifstr(response.status_code)[:1] =='2':
# response = s.post(url, params=param, json=json_data, verify=False)
success =Trueelse:
wait = retries *2print("Retry fails! Waiting "+str(wait) +" seconds and retrying.")
time.sleep(wait)
retries +=1# print('Return Service::')# Checking Maximum Retriesif retries == max_retries:
success =TrueraiseValueErrorprint("JSON RESPONSE:::", response.text)
var2 = datetime.datetime.now().strftime("%H:%M:%S")
print('Json Fetch End Time:', var2)
# Capturing the response json from Web Service
response_json = response.text
load_val = json.loads(response_json)
# Based on the mode application will send the return valueif En_Dec_Mode =='En':
encrypt_ele =str(load_val['encrypt_val'])
return_ele = encrypt_ele
else:
decrypt_ele =str(load_val['decrypt_val'])
return_ele = decrypt_ele
return return_ele
exceptValueErroras v:
raiseValueErrorexceptExceptionas e:
x =str(e)
print(x)
return'Error'
Let’s discuss the key lines –
try: _create_unverified_https_context = ssl._create_unverified_contextexcept AttributeError: # Legacy python that doesn't verify HTTPS certificates by default passelse: # Handle target environment that doesn't support HTTPS verification ssl._create_default_https_context = _create_unverified_https_context
If you are running in a secure environment. Sometimes, your proxy or firewall blocks you from accessing the API server – if they are using different networks. Hence, we need to bypass that. However, it is advisable not to use this in Prod environment for obvious reasons.
# Capturing the payloaddata = self.payload# Converting String to Jsonjson_data = json.loads(data)
This snippet will convert your data frame into a JSON object.
In the first 3 lines, the application is building a JSON response, which will be sent to the API Server. And, it will capture the response from the server.
Next 8 lines will check the status code. And, based on the status code, it will continue or retry the requests in case if there is any failure or lousy response from the server.
Last 3 lines say if the application crosses the maximum allowable error limit, it will terminate the process by raising it as an error.
# Capturing the response json from Web Serviceresponse_json = response.textload_val = json.loads(response_json)
Once, it receives the valid response, the application will convert it back to the dataframe & send it to the calling methods.
7. clsParam.py (This script contains the fundamental parameter values to run your client application. Hence, the name comes into the picture.)
############################################### Written By: SATYAKI DE ############ Written On: 20-Jan-2019 ###################################################importosclassclsParam(object):
config = {
'MAX_RETRY' : 5,
'ENCRYPT_MODE' : 'En',
'DECRYPT_MODE': 'De',
'PATH' : os.path.dirname(os.path.realpath(__file__)),
'SRC_DIR' : os.path.dirname(os.path.realpath(__file__)) +'/'+'src_files/',
'FIN_DIR': os.path.dirname(os.path.realpath(__file__)) +'/'+'finished/',
'ENCRYPT_URL': "http://192.168.0.13:5000/process/getEncrypt",
'DECRYPT_URL': "http://192.168.0.13:5000/process/getDecrypt",
'NUM_OF_THREAD': 20
}
8. clsSerial.py (This script will show the usual or serial way to convert your data into encryption & then to decrypts & store the result into two separate csv files. Hence, the name comes into the picture.)
################################################ Written By: SATYAKI DE ######## Written On: 10-Feb-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## initiate the encrypt/decrypt class ######## based on client supplied data ######## using serial mode operation. ################################################importpandasaspimportclsWebascwimportdatetimefromclsParamimport clsParam as cf
# Disbling Warningsdefwarn(*args, **kwargs):
passimportwarnings
warnings.warn = warn
classclsSerial(object):
def__init__(self):
self.path = cf.config['PATH']
self.EncryptMode =str(cf.config['ENCRYPT_MODE'])
self.DecryptMode =str(cf.config['DECRYPT_MODE'])
# Lookup Methods for Encryptiondefencrypt_acctNbr(self, row):
# Declaring Local Variable
en_AcctNbr =''
json_source_str =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr =str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_nbr +'","dataTemplate":"subGrAcct_Nbr"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_nbr)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_AcctNbr = x.getResponse(EncryptMode)
else:
en_AcctNbr =''
fil_acct_nbr =''
fil_acct_nbr =''return en_AcctNbr
defencrypt_Name(self, row):
# Declaring Local Variable
en_AcctName =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_acctName = row['Name']
str_acct_name =str(lkp_acctName)
fil_acct_name = str_acct_name.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_name +'","dataTemplate":"subGrName"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_name)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_AcctName = x.getResponse(EncryptMode)
else:
en_AcctName =''return en_AcctName
defencrypt_Phone(self, row):
# Declaring Local Variable
en_Phone =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_phone = row['Phone']
str_phone =str(lkp_phone)
fil_phone = str_phone.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_phone +'","dataTemplate":"subGrPhone"}'# Identifying Length of the field
len_acct_nbr =len(fil_phone)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_Phone = x.getResponse(EncryptMode)
else:
en_Phone =''return en_Phone
defencrypt_Email(self, row):
# Declaring Local Variable
en_Email =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_email = row['Email']
str_email =str(lkp_email)
fil_email = str_email.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_email +'","dataTemplate":"subGrEmail"}'# Identifying Length of the field
len_acct_nbr =len(fil_email)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_Email = x.getResponse(EncryptMode)
else:
en_Email =''return en_Email
# Lookup Methods for Decryptiondefdecrypt_acctNbr(self, row):
# Declaring Local Variable
de_AcctNbr =''
json_source_str =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr =str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_nbr +'","dataTemplate":"subGrAcct_Nbr"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_nbr)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_AcctNbr = x.getResponse(EncryptMode)
else:
de_AcctNbr =''return de_AcctNbr
defdecrypt_Name(self, row):
# Declaring Local Variable
de_AcctName =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_acctName = row['Name']
str_acct_name =str(lkp_acctName)
fil_acct_name = str_acct_name.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_name +'","dataTemplate":"subGrName"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_name)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_AcctName = x.getResponse(EncryptMode)
else:
de_AcctName =''return de_AcctName
defdecrypt_Phone(self, row):
# Declaring Local Variable
de_Phone =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_phone = row['Phone']
str_phone =str(lkp_phone)
fil_phone = str_phone.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_phone +'","dataTemplate":"subGrPhone"}'# Identifying Length of the field
len_acct_nbr =len(fil_phone)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_Phone = x.getResponse(EncryptMode)
else:
de_Phone =''return de_Phone
defdecrypt_Email(self, row):
# Declaring Local Variable
de_Email =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_email = row['Email']
str_email =str(lkp_email)
fil_email = str_email.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_email +'","dataTemplate":"subGrEmail"}'# Identifying Length of the field
len_acct_nbr =len(fil_email)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_Email = x.getResponse(EncryptMode)
else:
de_Email =''return de_Email
defgetEncrypt(self, df_payload):
try:
df_input = p.DataFrame()
df_fin = p.DataFrame()
# Assigning Target File Basic Name
df_input = df_payload
# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
# Deriving rows
df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)
# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)
# Renaming new columns with the old column names
df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)
# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
df_fin = df_input.reindex(column_order, axis=1)
return df_fin
exceptExceptionas e:
df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})
return df_error
defgetDecrypt(self, df_encrypted_payload):
try:
df_input = p.DataFrame()
df_fin = p.DataFrame()
# Assigning Target File Basic Name
df_input = df_encrypted_payload
# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
# Deriving rows
df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)
# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)
# Renaming new columns with the old column names
df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)
# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email']
df_fin = df_input.reindex(column_order, axis=1)
return df_fin
exceptExceptionas e:
df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':''})
return df_error
Key lines to discuss –
Main two methods, we’ll be looking into & they are –
a. getEncrypt
b. getDecrypt
However, these two functions constructions are identical in nature. One is for encryption & the other one is decryption.
As you can see, the application is processing row-by-row & column-by-column data transformations using look-up functions.
# Dropping original columnsdf_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)
As the comment suggested, the application is dropping all the unencrypted source columns.
# Renaming new columns with the old column namesdf_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)
Once, the application drops all the source columns, it will rename the new column names back to old columns & based on this data will be merged with the rest of the data from the source csv.
# New Column List Orderscolumn_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']df_fin = df_input.reindex(column_order, axis=1)
Once, the application finished doing all these transformations, it will now re-sequence the order of the columns, which will create the same column order as it’s source csv files.
Similar logic is applicable for the decryption as well.
As we know, there are many look-up methods take part as part of this drive.
From the row, our application is extracting the relevant column. In this case, it is Acct_Nbr. And, then converts it to string & remove any unnecessary white space from it.
# Forming JSON String for this fieldjson_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'
Once extracted, the application will build the target JON string as per column data.
# Identifying Length of the fieldlen_acct_nbr = len(fil_acct_nbr)# This will trigger the service if it has valid dataif len_acct_nbr > 0: x = cw.clsWeb(json_source_str) en_AcctNbr = x.getResponse(EncryptMode)else: en_AcctNbr = ''
Based on the length of the extracted value, our application will trigger the individual JSON requests & will receive the data frame in response.
9. clsParallel.py (This script will use the queue to make asynchronous calls & perform the same encryption & decryption. Hence, the name comes into the picture.)
################################################ Written By: SATYAKI DE ######## Written On: 10-Feb-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## initiate the encrypt/decrypt class ######## based on client supplied data. ######## This script will use the advance ######## queue & asynchronus calls to the ######## API Server to process Encryption & ######## Decryption on our csv files. ################################################importpandasaspimportclsWebServiceascwimportdatetimefromclsParamimport clsParam as cf
frommultiprocessingimport Lock, Process, Queue, freeze_support, JoinableQueue
importgcimportsignalimporttimeimportosimportqueueimportasyncio# Declaring Global Variable
q = Queue()
lock = Lock()
finished_task = JoinableQueue()
pending_task = JoinableQueue()
sp_fin_dict = {}
dp_fin_dict = {}
# Disbling Warningsdefwarn(*args, **kwargs):
passimportwarnings
warnings.warn = warn
classclsParallel(object):
def__init__(self):
self.path = cf.config['PATH']
self.EncryptMode =str(cf.config['ENCRYPT_MODE'])
self.DecryptMode =str(cf.config['DECRYPT_MODE'])
self.num_worker_process =int(cf.config['NUM_OF_THREAD'])
self.lock = Lock()
# Lookup Methods for Encryptiondefencrypt_acctNbr(self, row):
# Declaring Local Variable
en_AcctNbr =''
json_source_str =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr =str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_nbr +'","dataTemplate":"subGrAcct_Nbr"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_nbr)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_AcctNbr = x.getResponse(EncryptMode)
else:
en_AcctNbr =''
fil_acct_nbr =''return en_AcctNbr
defencrypt_Name(self, row):
# Declaring Local Variable
en_AcctName =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_acctName = row['Name']
str_acct_name =str(lkp_acctName)
fil_acct_name = str_acct_name.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_name +'","dataTemplate":"subGrName"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_name)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_AcctName = x.getResponse(EncryptMode)
else:
en_AcctName =''return en_AcctName
defencrypt_Phone(self, row):
# Declaring Local Variable
en_Phone =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_phone = row['Phone']
str_phone =str(lkp_phone)
fil_phone = str_phone.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_phone +'","dataTemplate":"subGrPhone"}'# Identifying Length of the field
len_acct_nbr =len(fil_phone)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_Phone = x.getResponse(EncryptMode)
else:
en_Phone =''return en_Phone
defencrypt_Email(self, row):
# Declaring Local Variable
en_Email =''# Capturing essential values
EncryptMode =self.EncryptMode
lkp_email = row['Email']
str_email =str(lkp_email)
fil_email = str_email.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_email +'","dataTemplate":"subGrEmail"}'# Identifying Length of the field
len_acct_nbr =len(fil_email)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
en_Email = x.getResponse(EncryptMode)
else:
en_Email =''return en_Email
# Lookup Methods for Decryptiondefdecrypt_acctNbr(self, row):
# Declaring Local Variable
de_AcctNbr =''
json_source_str =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_acctNbr = row['Acct_Nbr']
str_acct_nbr =str(lkp_acctNbr)
fil_acct_nbr = str_acct_nbr.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_nbr +'","dataTemplate":"subGrAcct_Nbr"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_nbr)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_AcctNbr = x.getResponse(EncryptMode)
else:
de_AcctNbr =''return de_AcctNbr
defdecrypt_Name(self, row):
# Declaring Local Variable
de_AcctName =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_acctName = row['Name']
str_acct_name =str(lkp_acctName)
fil_acct_name = str_acct_name.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_acct_name +'","dataTemplate":"subGrName"}'# Identifying Length of the field
len_acct_nbr =len(fil_acct_name)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_AcctName = x.getResponse(EncryptMode)
else:
de_AcctName =''return de_AcctName
defdecrypt_Phone(self, row):
# Declaring Local Variable
de_Phone =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_phone = row['Phone']
str_phone =str(lkp_phone)
fil_phone = str_phone.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_phone +'","dataTemplate":"subGrPhone"}'# Identifying Length of the field
len_acct_nbr =len(fil_phone)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_Phone = x.getResponse(EncryptMode)
else:
de_Phone =''return de_Phone
defdecrypt_Email(self, row):
# Declaring Local Variable
de_Email =''# Capturing essential values
EncryptMode =self.DecryptMode
lkp_email = row['Email']
str_email =str(lkp_email)
fil_email = str_email.strip()
# Forming JSON String for this field
json_source_str ='{"dataGroup":"GrDet","data":"'+ fil_email +'","dataTemplate":"subGrEmail"}'# Identifying Length of the field
len_acct_nbr =len(fil_email)
# This will trigger the service if it has valid dataif len_acct_nbr >0:
x = cw.clsWeb(json_source_str)
de_Email = x.getResponse(EncryptMode)
else:
de_Email =''return de_Email
defgetEncrypt(self, df_dict):
try:
en_fin_dict = {}
df_input = p.DataFrame()
df_fin = p.DataFrame()
# Assigning Target File Basic Namefor k, v in df_dict.items():
Process_Name = k
df_input = v
# Checking total count of rows
count_row =int(df_input.shape[0])
print('Part number of records to process:: ', count_row)
if count_row >0:
# Deriving rows
df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)
# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)
# Renaming new columns with the old column names
df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)
# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
df_fin = df_input.reindex(column_order, axis=1)
sp_fin_dict[Process_Name] = df_fin
return sp_fin_dict
exceptExceptionas e:
df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})
sp_fin_dict[Process_Name] = df_error
return sp_fin_dict
async defproduceEncr(self, queue, l_dict):
m_dict = {}
m_dict =self.getEncrypt(l_dict)
for k, v in m_dict.items():
item = k
print('producing {}...'.format(item))
await queue.put(m_dict)
async defconsumeEncr(self, queue):
result_dict = {}
whileTrue:
# wait for an item from the producer
sp_fin_dict.update(await queue.get())
# process the itemfor k, v in sp_fin_dict.items():
item = k
print('consuming {}...'.format(item))
# Notify the queue that the item has been processed
queue.task_done()
async defrunEncrypt(self, n, df_input):
l_dict = {}
queue = asyncio.Queue()
# schedule the consumer
consumer = asyncio.ensure_future(self.consumeEncr(queue))
start_pos =0
end_pos =0
num_worker_process = n
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
interval =int(count_row / num_worker_process) +1
actual_worker_task =int(count_row / interval) +1for i inrange(actual_worker_task):
name ='Task-'+str(i)
if ((start_pos + interval) < count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row - start_pos)
print("start_pos: ", start_pos)
print("end_pos: ", end_pos)
split_df = df_input.iloc[start_pos:end_pos]
l_dict[name] = split_df
if ((start_pos > count_row) | (start_pos == count_row)):
breakelse:
start_pos = start_pos + interval
# run the producer and wait for completion
await self.produceEncr(queue, l_dict)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumer.cancel()
return sp_fin_dict
defgetEncryptParallel(self, df_payload):
l_dict = {}
data_dict = {}
min_val_list = {}
cnt =1
num_worker_process =self.num_worker_process
actual_worker_task =0
number_of_processes =4
processes = []
split_df = p.DataFrame()
df_ret = p.DataFrame()
dummy_df = p.DataFrame()
# Assigning Target File Basic Name
df_input = df_payload
# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
interval =int(count_row / num_worker_process) +1
actual_worker_task =int(count_row/interval) +1
loop = asyncio.get_event_loop()
loop.run_until_complete(self.runEncrypt(actual_worker_task, df_input))
loop.close()
for k, v in sp_fin_dict.items():
min_val_list[int(k.replace('Task-', ''))] = v
min_val =min(min_val_list, key=int)
print("Minimum Index Value: ", min_val)
for k, v insorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
ifint(k.replace('Task-', '')) == min_val:
df_ret = sp_fin_dict[k]
else:
d_frames = [df_ret, sp_fin_dict[k]]
df_ret = p.concat(d_frames)
return df_ret
defgetDecrypt(self, df_encrypted_dict):
try:
de_fin_dict = {}
df_input = p.DataFrame()
df_fin = p.DataFrame()
# Assigning Target File Basic Namefor k, v in df_encrypted_dict.items():
Process_Name = k
df_input = v
# Checking total count of rows
count_row =int(df_input.shape[0])
print('Part number of records to process:: ', count_row)
if count_row >0:
# Deriving rows
df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)
# Dropping original columns
df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)
# Renaming new columns with the old column names
df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)
# New Column List Orders
column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
df_fin = df_input.reindex(column_order, axis=1)
de_fin_dict[Process_Name] = df_fin
return de_fin_dict
exceptExceptionas e:
df_error = p.DataFrame({'Acct_Nbr': str(e), 'Name': '', 'Acct_Addr_1': '', 'Acct_Addr_2': '', 'Phone': '', 'Email': '', 'Serial_No': ''})
de_fin_dict[Process_Name] = df_error
return de_fin_dict
async defproduceDecr(self, queue, l_dict):
m_dict = {}
m_dict =self.getDecrypt(l_dict)
for k, v in m_dict.items():
item = k
print('producing {}...'.format(item))
await queue.put(m_dict)
async defconsumeDecr(self, queue):
result_dict = {}
whileTrue:
# wait for an item from the producer
dp_fin_dict.update(await queue.get())
# process the itemfor k, v in dp_fin_dict.items():
item = k
print('consuming {}...'.format(item))
# Notify the queue that the item has been processed
queue.task_done()
async defrunDecrypt(self, n, df_input):
l_dict = {}
queue = asyncio.Queue()
# schedule the consumer
consumerDe = asyncio.ensure_future(self.consumeDecr(queue))
start_pos =0
end_pos =0
num_worker_process = n
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
interval =int(count_row / num_worker_process) +1
actual_worker_task =int(count_row / interval) +1for i inrange(actual_worker_task):
name ='Task-'+str(i)
if ((start_pos + interval) < count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row - start_pos)
print("start_pos: ", start_pos)
print("end_pos: ", end_pos)
split_df = df_input.iloc[start_pos:end_pos]
l_dict[name] = split_df
if ((start_pos > count_row) | (start_pos == count_row)):
breakelse:
start_pos = start_pos + interval
# run the producer and wait for completion
await self.produceDecr(queue, l_dict)
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
consumerDe.cancel()
return dp_fin_dict
defgetDecryptParallel(self, df_payload):
l_dict = {}
data_dict = {}
min_val_list = {}
cnt =1
num_worker_process =self.num_worker_process
actual_worker_task =0
number_of_processes =4
processes = []
split_df = p.DataFrame()
df_ret_1 = p.DataFrame()
dummy_df = p.DataFrame()
# Assigning Target File Basic Name
df_input = df_payload
# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)
interval =int(count_row / num_worker_process) +1
actual_worker_task =int(count_row/interval) +1
loop_1 = asyncio.new_event_loop()
asyncio.set_event_loop(asyncio.new_event_loop())
loop_2 = asyncio.get_event_loop()
loop_2.run_until_complete(self.runDecrypt(actual_worker_task, df_input))
loop_2.close()
for k, v in dp_fin_dict.items():
min_val_list[int(k.replace('Task-', ''))] = v
min_val =min(min_val_list, key=int)
print("Minimum Index Value: ", min_val)
for k, v insorted(dp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))):
ifint(k.replace('Task-', '')) == min_val:
df_ret_1 = dp_fin_dict[k]
else:
d_frames = [df_ret_1, dp_fin_dict[k]]
df_ret_1 = p.concat(d_frames)
return df_ret_1
I don’t want to discuss any more look-up methods as the post is already pretty big. Only address a few critical lines
Under getEncryptParallel, the following lines are essential –
# Checking total count of rowscount_row = df_input.shape[0]print('Total number of records to process:: ', count_row)interval = int(count_row / num_worker_process) + 1actual_worker_task = int(count_row/interval) + 1
Based on the dataframe total number of records, our application will split that main dataframe into parts of sub dataframe & then pass them using queue by asynchronous queue calls.
Initiating our queue methods & passing our dataframe to it.
for k, v in sorted(sp_fin_dict.items(), key=lambda k: int(k[0].replace('Task-', ''))): if int(k.replace('Task-', '')) == min_val: df_ret = sp_fin_dict[k] else: d_frames = [df_ret, sp_fin_dict[k]] df_ret = p.concat(d_frames)
Our application is sending & receiving data using the dictionary. The reason is – we’re not expecting data that we may get it from our server in sequence. Instead, we’re hoping the data will be random. Hence, using keys, we’re maintaining our final sequence & that will ensure our application to joining back to the correct sets of source data, which won’t be the candidate for any encryption/decryption.
Let’s discuss runEncrypt method.
for i in range(actual_worker_task): name = 'Task-' + str(i) if ((start_pos + interval) < count_row): end_pos = start_pos + interval else: end_pos = start_pos + (count_row - start_pos) print("start_pos: ", start_pos) print("end_pos: ", end_pos) split_df = df_input.iloc[start_pos:end_pos] l_dict[name] = split_df if ((start_pos > count_row) | (start_pos == count_row)): break else: start_pos = start_pos + interval
Here, our application is splitting our source data frame into multiple sub dataframe & then it can be processed in parallel using queues.
# run the producer and wait for completionawait self.produceEncr(queue, l_dict)# wait until the consumer has processed all itemsawait queue.join()
Invoking the encryption-decryption process using queues. The last line is significant. The queue will not destroy until all the item produced/place into the queue are not consumed. Hence, your main program will wait until it processes all the records of your dataframe.
Two methods named produceEncr & consumeEncr mainly used for placing an item inside the queue & then after encryption/decryption it will retrieve it from the queue.
Few important lines from both the methods are –
#produceEncrawait queue.put(m_dict)#consumeEncr# wait for an item from the producersp_fin_dict.update(await queue.get())
# Notify the queue that the item has been processedqueue.task_done()
From the first two lines, one can see that the application will place its item into the queue. Rests are the lines from the other methods. Our application is pouring the data into the dictionary, which will be returned to our calling methods. The last line is significantly essential. Without the task_done process, the queue will continue to wait for upcoming items. Hence, that will trigger infinite wait or sometimes deadlock.
10. callClient.py (This script will trigger both the serial & parallel process of encryption one by one & finally capture some statistics. Hence, the name comes into the picture.)
Today, we’ll be looking into another exciting installment of cross-over between Reality Stone & Timestone from the python verse.
We’ll be exploring Encryption/Decryption implemented using the Flask Framework Server component. We would like to demonstrate this Encrypt/Decrypt features as Server API & then we can call it from clients like Postman to view the response.
So, here are primary focus will be implementing this in Server-side rather than the client-side.
However, there is a catch. We would like to implement different kind of encryption or decryption based on our source data.
Let’s look into the sample data first –
As you can see, we intend to encrypt Account Number encryption with different salt compared to Name or Phone or Email. Hence, we would be using different salt to encrypt our sample data & get the desired encrypt/decrypt output.
From the above data, we can create the following types of JSON payload –
Let’s explore –
Before we start, we would like to show you the directory structure of Windows & MAC as we did the same in my earlier post as well.
Following are the scripts that we’re using to develop this server applications & they are as follows –
1. clsConfigServer.py (This script contains all the parameters of the server.)
As mentioned, the different salt key’s defined for different kind of data.
2. clsEnDec.py (This script is a lighter version of encryption & decryption of our previously discussed script. Hence, we won’t discuss in details. You can refer my earlier post to understand the logic of this script.)
############################################### Written By: SATYAKI DE ############ Written On: 25-Jan-2019 ############ Package Cryptography needs to ############ install in order to run this ############ script. ############ ############ Objective: This script will ############ encrypt/decrypt based on the ############ hidden supplied salt value. ###################################################fromcryptography.fernetimport Fernet
classclsEnDec(object):
def__init__(self, token):
# Calculating Keyself.token = token
defencrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
encr_val =str(cipher.encrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return encr_val
exceptExceptionas e:
x =str(e)
print(x)
encr_val =''return encr_val
defdecrypt_str(self, data):
try:
# Capturing the Salt Information
salt =self.token
# Checking Individual Types inside the Dataframe
cipher = Fernet(salt)
decr_val =str(cipher.decrypt(bytes(data,'utf8'))).replace("b'","").replace("'","")
return decr_val
exceptExceptionas e:
x =str(e)
print(x)
decr_val =''return decr_val
3. clsFlask.py (This is the main server script that will the encrypt/decrypt class from our previous script. This script will capture the requested JSON from the client, who posted from the clients like another python script or third-party tools like Postman.)
############################################### Written By: SATYAKI DE ######## Written On: 25-Jan-2019 ######## Package Flask package needs to ######## install in order to run this ######## script. ######## ######## Objective: This script will ######## encrypt/decrypt based on the ######## supplied salt value. Also, ######## this will capture the individual ######## element & stored them into JSON ######## variables using flask framework. ###############################################fromclsConfigServerimport clsConfigServer as csf
importclsEnDecascenclassclsFlask(object):
def__init__(self):
self.xtoken =str(csf.config['DEF_SALT'])
defgetEncryptProcess(self, dGroup, input_data, dTemplate):
try:
# It is sending default salt value
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will encrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.encrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid json Error Responsereturn ret_val
defgetDecryptProcess(self, dGroup, input_data, dTemplate):
try:
xtoken =self.xtoken
# Capturing the individual element
dGroup = dGroup
input_data = input_data
dTemplate = dTemplate
# This will check the mandatory json elementsif ((dGroup !='') & (dTemplate !='')):
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will decrypt the dataif ((dGroup =='GrDet') & (dTemplate =='subGrAcct_Nbr')):
xtoken =str(csf.config['ACCT_NBR_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrName')):
xtoken =str(csf.config['NAME_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrPhone')):
xtoken =str(csf.config['PHONE_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
elif ((dGroup =='GrDet') & (dTemplate =='subGrEmail')):
xtoken =str(csf.config['EMAIL_SALT'])
print("xtoken: ", xtoken)
print("Flask Input Data: ", input_data)
x = cen.clsEnDec(xtoken)
ret_val = x.decrypt_str(input_data)
else:
ret_val =''else:
ret_val =''# Return valuereturn ret_val
exceptExceptionas e:
ret_val =''# Return the valid Error Responsereturn ret_val
Key lines to check –
# This will check the mandatory json elementsif ((dGroup != '') & (dTemplate != '')):
Encrypt & Decrypt will only work on the data when the key element contains valid values. In this case, we are looking for values stored in dGroup & dTemplate, which will denote the specific encryption type.
# Based on the Group & Element it will fetch the salt# Based on the specific salt it will encrypt the dataif ((dGroup == 'GrDet') & (dTemplate == 'subGrAcct_Nbr')): xtoken = str(csf.config['ACCT_NBR_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrName')): xtoken = str(csf.config['NAME_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrPhone')): xtoken = str(csf.config['PHONE_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)elif ((dGroup == 'GrDet') & (dTemplate == 'subGrEmail')): xtoken = str(csf.config['EMAIL_SALT']) print("xtoken: ", xtoken) print("Flask Input Data: ", input_data) x = cen.clsEnDec(xtoken) ret_val = x.encrypt_str(input_data)
Here, as you can see that based on dGroup & dTemplate, the application is using specific salt to encrypt or decrypt the corresponding data. Highlighted dark brown showed a particular salt against dGroup & dTemplate.
4. callRunServer.py (This script will create an instance of Flask Server & serve encrypt/decrypt facilities & act as an endpoint or server API & provide the response made to it by clients such as another python or any third-party application.)
Based on the path & method, this will trigger either encrypt or decrypt methods.
# If the server application doesn't have
# valid json, it will throw 400 error
if not request.get_json: abort(400)
As the comments suggested, this will check whether the sample data send to the server application is a valid JSON or not. And, based on that, it will proceed or abort the request & send the response back to the client.
# Capturing the individual element
content = request.get_json()dGroup = content['dataGroup']input_data = content['data']dTemplate = content['dataTemplate']
Here, the application is capturing the json into individual elements.
if ((dGroup != '') & (dTemplate != '')): y = clf.clsFlask() ret_val = y.getEncryptProcess(dGroup, input_data, dTemplate)else: abort(500)
The server will process only when both the dGroup & dTemplate will contains no null values. The same logic is applicable for both the encrypt & decrypt process.
return jsonify({'status': 'success', 'encrypt_val': ret_val})except Exception as e: x = str(e) return jsonify({'status': 'error', 'detail': x})
If the process is successful, then it will send a json response, or else it will return json with error details. Similar logic is applicable for decrypt as well.
Based on the supplied IP address from our configuration file, this server will create an instance on that specific IP address when triggers. Please refer clsConfigServer.py for particular parameter values.
Let’s run the server application & see the debug encrypt & decrypt screen looks from the server-side –
Windows (64 bit):
And, we’re using Postman Third-party app to invoke this & please find the authentication details & JSON Payload for encrypting are as follows –
Let’s see the decrypt from the server-side & how it looks like from the Postman –
Mac (32 bit):
Let’s look from MAC’s perspective & how the encryption debug looks like from the server.
Please find the screen from postman along with the necessary authentication –
Let’s discover how the decrypt looks like both from server & Postman as well –
So, from this post, we’ve achieved our goal. We’ve successfully demonstrated of a creating a server component using Flask framework & we’ve incorporated our custom encryption/decryption script to create a simulated API for the third-party clients or any other application.
Hope, you will like this approach.
Let me know your comment on the same.
I’ll bring some more exciting topic in the coming days from the Python verse.
In our last installment, we’ve shown pandas & numpy based on a specific situation. If that is our Space Stone installment of Python Verse, then this would be one approach of creating much interesting crossover of Space Stone & Reality Stone of Python verse. Yes. You are right. We’ll be discussing one requirement, where we need many of these in a single task.
Let’s dive into it!
Let’s assume that we have a source csv file which has the following data –
Now, the requirement is – we need to use one third party web service to send JSON payload preparing with this data & send them to the 3rd party API to get the City, State & based on that we need to find the total number of item sold against each State & City.
As per the agreement with this website, any developer can test 10 calls per day free. After that, it will send your response with encrypted values, e.g. Classified. But, we don’t need more than 10 calls to test it.
Here, we’ll be dealing with the 4 python scripts. Among them, one scenario I’ve already described in my previous post. So, I’ll be just mentioning the file & post the script.
Please find the directory structure in both the OS –
1. clsLpy (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
############################################### Written By: SATYAKI DE ############ Written On: 20-Jan-2019 ###################################################importpandasaspimportosimportplatformasplfromclsParamimport clsParam as cf
classclsL(object):
def__init__(self):
self.path = cf.config['PATH']
deflogr(self, Filename, Ind, df, subdir=None):
try:
x = p.DataFrame()
x = df
sd = subdir
os_det = pl.system()
if sd ==None:
if os_det =="Windows":
fullFileName =self.path +'\\'+ Filename
else:
fullFileName =self.path +'/'+ Filename
else:
if os_det =="Windows":
fullFileName =self.path +'\\'+ sd +"\\"+ Filename
else:
fullFileName =self.path +'/'+ sd +"/"+ Filename
if Ind =='Y':
x.to_csv(fullFileName, index=False)
return0exceptExceptionas e:
y =str(e)
print(y)
return3
2. clsParam.py (This script contains the parameter entries in the form of dictionary & later this can be used in all the relevant python scripts as configuration parameters.)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
############################################### Written By: SATYAKI DE ############ Written On: 20-Jan-2019 ###################################################importosclassclsParam(object):
config = {
'MAX_RETRY' : 5,
'API_KEY' : 'HtWTVS86n8xoGXahyg1tPYH0HwngPqH2YFICzRCtlLbCtfNdya8L1UwRvH90PeMF',
'PATH' : os.path.dirname(os.path.realpath(__file__)),
'SUBDIR' : 'data'
}
As you can see from this script that we’ve declared all the necessary parameters here as dictionary object & later we’ll be referring these parameters in the corresponding python scripts.
One crucial line, we’ll look into. API_KEY will be used while sending the JSON payload to the third-party web service. We’ll get this API_KEY from the highlighted (In Yellow) picture posted above.
3. clsWeb.py (This is the main script, which will first convert the pandas’ data frames into JSON & and send the API request as per the third party site. It will capture the response & convert that by normalizing the data & poured it back to the data frame for further process.)
############################################### Written By: SATYAKI DE ############ Written On: 20-Jan-2019 ###################################################importjsonimportrequestsimportdatetimeimporttimeimportsslfromurllib.requestimport urlopen
importpandasaspimportnumpyasnpimportosimportgcfromclsParamimport clsParam as cp
classclsWeb(object):
def__init__(self, payload, format, unit):
self.payload = payload
self.path = cp.config['PATH']
# To disable logging infoself.max_retries = cp.config['MAX_RETRY']
self.api_key = cp.config['API_KEY']
self.unit = unit
self.format =format
defget_response(self):
# Assigning Logging Info
max_retries =self.max_retries
api_key =self.api_key
unit =self.unit
format =self.format
df_conv = p.DataFrame()
cnt =0try:
# Bypassing SSL Authenticationtry:
_create_unverified_https_context = ssl._create_unverified_context
exceptAttributeError:
# Legacy python that doesn't verify HTTPS certificates by defaultpasselse:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Capturing the payload
data_df =self.payload
temp_df = data_df[['zipcode']]
list_of_rec = temp_df['zipcode'].values.tolist()
print(list_of_rec)
for i in list_of_rec:
zip= i
# Providing the url
url_part ='http://www.zipcodeapi.com/rest/'
url = url_part + api_key +'/'+'info.'+ format +'/'+str(zip) +'/'+ unit
headers = {"Content-type": "application/json"}
param = headers
var1 = datetime.datetime.now().strftime("%H:%M:%S")
print('Json Fetch Start Time:', var1)
retries =1
success =Falsewhilenot success:
# Getting response from web service
response = requests.get(url, params=param, verify=False)
# print("Complete Error:: ", str(response.status_code))# print("Error First::", str(response.status_code)[:1])ifstr(response.status_code)[:1] =='2':
# response = s.post(url, params=param, json=json_data, verify=False)
success=Trueelse:
wait = retries *2print("Retry fails! Waiting "+str(wait) +" seconds and retrying.")
time.sleep(wait)
retries +=1# Checking Maximum Retriesif retries == max_retries:
success=TrueraiseValueError# print(response.text)
var2 = datetime.datetime.now().strftime("%H:%M:%S")
print('Json Fetch End Time:', var2)
print("-"*90)
# Capturing the response json from Web Service
df_response_json = response.text
string_to_json = json.loads(df_response_json)
# Converting the response json to Dataframe# df_Int_Rec = p.read_json(string_to_json, orient='records')
df_Int_Rec = p.io.json.json_normalize(string_to_json)
df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])
if cnt ==0:
df_conv = df_Int_Rec
else:
d_frames = [df_conv, df_Int_Rec]
df_conv = p.concat(d_frames)
cnt +=1# Deleting temporary dataframes & Releasing memoriesdel [[df_Int_Rec]]
gc.collect()
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# Merging two data side ways maintaining the orders
df_add = p.concat([data_df, df_conv], axis=1)
del [[df_conv]]
gc.collect()
# Dropping unwanted column
df_add.drop(['acceptable_city_names'], axis=1, inplace=True)
return df_add
exceptValueErroras v:
print(response.text)
x =str(v)
print(x)
# Return Empty Dataframe
df = p.DataFrame()
return df
exceptExceptionas e:
print(response.text)
x =str(e)
print(x)
# Return Empty Dataframe
df = p.DataFrame()
return df
Let’s look at the key lines to discuss –
def __init__(self, payload, format, unit):
self.payload = payload self.path = cp.config['PATH']
# To disable logging info self.max_retries = cp.config['MAX_RETRY'] self.api_key = cp.config['API_KEY'] self.unit = unit self.format = format
The first block will be instantiated as soon as you are invoking the class. Note that, we’ve used our parameter class python script here as cp & then we’re referring the corresponding elements as & when requires. Other parameters will be captured from the invoking script, which we’ll be discussed later in this post.
# Bypassing SSL Authenticationtry: _create_unverified_https_context = ssl._create_unverified_contextexcept AttributeError: # Legacy python that doesn't verify HTTPS certificates by default passelse: # Handle target environment that doesn't support HTTPS verification ssl._create_default_https_context = _create_unverified_https_context
Sometimes, Your Firewall or Proxy might block your web service request due to a specific certificate error. This snippet will bypass that authentication. However, it is always advised to use proper SSL certification in the Production environment.
# Capturing the payloaddata_df = self.payloadtemp_df = data_df[['zipcode']]list_of_rec = temp_df['zipcode'].values.tolist()
In this snippet, we’re capturing the zip code from our source data frame & converting them into a list & this would be our candidate to pass the data as part of our JSON payload.
for i in list_of_rec: zip = i # Providing the url url_part = 'http://www.zipcodeapi.com/rest/' url = url_part + api_key + '/' + 'info.' + format + '/' + str(zip) + '/' + unit headers = {"Content-type": "application/json"} param = headers
Once, we’ve extracted our zip codes, we’re passing it one-by-one & forming our JSON with header & data.
retries = 1success = Falsewhile not success: # Getting response from web service response = requests.get(url, params=param, verify=False) # print("Complete Error:: ", str(response.status_code)) # print("Error First::", str(response.status_code)[:1]) if str(response.status_code)[:1] == '2': # response = s.post(url, params=param, json=json_data, verify=False) success=True else: wait = retries * 2 print("Retry fails! Waiting " + str(wait) + " seconds and retrying.") time.sleep(wait) retries += 1 # Checking Maximum Retries if retries == max_retries: success=True raise ValueError
In this section, we’re posting our JSON application & waiting for the response from the third-party API. If we receive the success response (200), we will proceed with the next zip code. However, if we didn’t receive the success response, we’ll retry the post option again until or unless it reaches the maximum limits. In case, if the application still waiting for a valid answer even after the maximum limit, it will exit from the loop & raise an error to the main application.
# Capturing the response json from Web Servicedf_response_json = response.textstring_to_json = json.loads(df_response_json)# Converting the response json to Dataframe# df_Int_Rec = p.read_json(string_to_json, orient='records')df_Int_Rec = p.io.json.json_normalize(string_to_json)df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])
This snippet will extract the desired response from the API & convert that back to the Pandas data frame. Last two lines, it is normalizing the data that it has received from the API for further process. This is critical steps as these steps will lead to extract City & State from our API response.
# Merging two data side ways maintaining the ordersdf_add = p.concat([data_df, df_conv], axis=1)
Once, we’ll have structured data – we can merge it back to our source data frame for our next step.
4. callWebservice.py (This script will call the API script & also process the data to create an aggregate report for our task.)
######################################################## Objective: Purpose of this Library is to call ###### the Web Service method to capture the city, ###### & the state as a json response & update them ###### in the dataframe & finally produce the summary###### of Total Sales & Item Counts based on the City###### & the State. ###### ###### Arguments are as follows: ###### Mentioned the Exception Part. First time dry ###### run the program without providing any args. ###### It will show all the mandatory params. ###### ################################################################################################################# Written By: SATYAKI DE ####### Written On: 20-Jan-2019 ########################################################importclsWebascwimportsysimportpandasaspimportosimportplatformasplimportclsLogaslogimportdatetimeimportnumpyasnpfromclsParamimport clsParam as cp
# Disbling Warningsdefwarn(*args, **kwargs):
passimportwarnings
warnings.warn = warn
defmain():
print("Calling the custom Package..")
try:
iflen(sys.argv) ==4:
inputFile =str(sys.argv[1])
format =str(sys.argv[2])
unit =str(sys.argv[3])
else:
raiseException# Checking whether the format contains# allowable choices or notif (format =='JSON'):
format ='json'elif (format =='CSV'):
format ='csv'elif (format =='XML'):
format ='xml'else:
raiseException# Checking whether the format contains# allowable choices or notif (unit =='DEGREE'):
unit ='degree'elif (unit =='RADIANS'):
unit ='radians'else:
raiseExceptionprint("*"*170)
print("Reading from "+str(inputFile))
print("*"*170)
# Initiating Logging Instances
clog = log.clsLog()
path = cp.config['PATH']
subdir = cp.config['SUBDIR']
os_det = pl.system()
if os_det =="Windows":
src_path = path +'\\'+'data\\'else:
src_path = path +'/'+'data/'# Reading source data csv file
df_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)
x = cw.clsWeb(df_Payload, format, unit)
retDf = x.get_response()
# Total Number of rows fetched
count_row = retDf.shape[0]
if count_row ==0:
print("Data Processing Issue!")
else:
print("Writing to file -> ("+str(inputFile) +"_modified.csv) Status: Success")
FileName, FileExtn = inputFile.split(".")
# Writing to the file
clog.logr(FileName +'_modified.'+ FileExtn, 'Y', retDf, subdir)
print("*"*170)
# Performing group by operation to get the desired result# State & City-wise total Sales & Item Sales
df_src = p.DataFrame()
df_src = retDf[['city', 'state', 'total', 'item_count']]
# Converting values to Integer
df_src['city_1'] = retDf['city'].astype(str)
df_src['state_1'] = retDf['state'].astype(str)
df_src['total_1'] = retDf['total'].astype(int)
df_src['item_count_1'] = retDf['item_count'].astype(int)
# Dropping the old Dtype Columns
df_src.drop(['city'], axis=1, inplace=True)
df_src.drop(['state'], axis=1, inplace=True)
df_src.drop(['total'], axis=1, inplace=True)
df_src.drop(['item_count'], axis=1, inplace=True)
# Renaming the new columns to as per Old Column Name
df_src.rename(columns={'city_1': 'city'}, inplace=True)
df_src.rename(columns={'state_1': 'state'}, inplace=True)
df_src.rename(columns={'total_1': 'total'}, inplace=True)
df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)
# Performing Group By Operation
grouped = df_src.groupby(['state', 'city'])
res_1 = grouped.aggregate(np.sum)
print("DF:")
print(res_1)
FileName1 ='StateCityWiseReport'# Handling Multiple source files
var = datetime.datetime.now().strftime(".%H.%M.%S")
print('Target File Extension will contain the following:: ', var)
# Writing to the file
clog.logr(FileName1 + var +'.'+ FileExtn, 'Y', df_src, subdir)
print("*"*170)
print("Operation done for "+str(inputFile) +"!")
print("*"*170)
exceptExceptionas e:
x =str(e)
print(x)
print("*"*170)
print('Current order would be - <'+str(sys.argv[0]) +'> <Csv File Name> <JSON/CSV/XML> <DEGREE/RADIANS>')
print('Make sure last two params should be in CAPS only!')
print("*"*170)
if __name__ =="__main__":
main()
Let’s look at some vital code snippet in this main script –
# Reading source data csv filedf_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)x = cw.clsWeb(df_Payload, format, unit)retDf = x.get_response()
In this snippet, we’re getting our data from our source csv & then calling our leading Web API service to get the State & City information.
Converting individual data type to appropriate data types. In Pandas, it is always advisable to change the data type of frames to avoid unforeseen scenarios.
# Dropping the old Dtype Columnsdf_src.drop(['city'], axis=1, inplace=True)df_src.drop(['state'], axis=1, inplace=True)df_src.drop(['total'], axis=1, inplace=True)df_src.drop(['item_count'], axis=1, inplace=True)# Renaming the new columns to as per Old Column Namedf_src.rename(columns={'city_1': 'city'}, inplace=True)df_src.rename(columns={'state_1': 'state'}, inplace=True)df_src.rename(columns={'total_1': 'total'}, inplace=True)df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)
Now, dropping the old columns & renaming the new columns to get the same column with correct data types. I personally like this way as it is an immaculate way to do this task. You can also debug it easily.
# Performing Group By Operationgrouped = df_src.groupby(['state', 'city'])res_1 = grouped.aggregate(np.sum)
And, finally, using Pandas group-by method we’re aggregating the groups & then using numpy to generate the same against each group.
Please check the first consolidated output –
From this screenshot, you can see how we have the desired intermediate data of City & State to proceed for the next level.
Let’s see how it runs –
Windows (64 bit):
Mac (32 bit):
So, from the screenshot, we can see our desired output & you can calculate the aggregated value based on our sample provided in the previous screenshot.
Let’s check how the data directory looks like after run –
Windows:
MAC:
So, finally, we’ve achieved our target.
I hope this will give you some more idea about more insights into the Python verse. Let me know – how do you think about this post.
The last couple of years, I’ve been working on various technologies. And, one of the interesting languages that I came across is Python. It is extremely flexible for developers to learn & rapidly develop with very few lines of code compared to the other languages. There are major versions of python that I worked with. Among them, python 2.7 & current python 3.7.1 are very popular to developers & my personal favorite.
There are many useful packages that are available to reduce the burden of the developers. Among them, packages like “pandas”, “numpy”, “json”, “AES”, “threading” etc. are extremely useful & one can do lot’s of work with it.
I personally prefer Ubuntu or Mac version of python. However, I’ve worked on Windows version as well or developed python based framework & application, which works in all the major operating systems. If you take care few things from the beginning, then you don’t have to make much more changes of your python application in order to work in all the major operating systems. 🙂
To me, Python Universe is nothing shorter than Marvel’s Universe of Avengers. In order to beat Supreme Villain Thanos (That Challenging & Complex Product with extremely tight timeline), you got to have 6 infinity stones to defeat him.
Space Stone ( Pandas & Numpy )
Reality Stone ( Json, SSL & Encryption/Decryption )
Power Stone ( Multi-Threading/Multi-Processing )
Mind Stone ( OS, Database, Directories & Files )
Soul Stone ( Logging & Exception )
Time Stone ( Cloud Interaction & Framework )
I’ll release a series of python based post in coming days, which might be useful for many peers or information seeker. Hopefully, this installment is a beginning & please follow my post. I hope, very soon you will get many such useful posts.
You get the latest version of Python from the official site given below –
You must be logged in to post a comment.