This site mainly deals with various use cases demonstrated using Python, Data Science, Cloud basics, SQL Server, Oracle, Teradata along with SQL & their implementation. Expecting yours active participation & time. This blog can be access from your TP, Tablet & mobile also. Please provide your feedback.
In an engaging session at the Global PowerBI Summit, we and our co-host delved into the evolving landscape of data technologies. Our discussion aimed to illuminate the distinctions and applications of several pivotal technologies in the data sphere, ranging from Lakehouse vs. Storage Account to the nuanced differences between Fabric Pipeline and Data Pipeline and the critical comparisons of Notebooks vs. Databricks, including their performance metrics. Furthermore, we explored the realm of model experimentation and Azure ML, shedding light on their performance benchmarks.
Lakehouse vs. Storage Account: Unveiling the Distinctions
Advantages of Lakehouse:
Enhanced File Previews and Transformations: The Lakehouse paradigm revolutionizes how we preview and transform files into SQL tables, offering a seamless data manipulation experience.
Robust Data Governance: It introduces native indexing for data lineage, PII scans, and discovery, thus laying a solid foundation for data governance.
Optimized Performance for Reporting: With direct lake mode, Lakehouse significantly improves performance for Power BI Reporting, catering to the needs of data analysts and business intelligence professionals.
Disadvantages:
Functional Restrictions: Despite its strengths, Lakehouse falls short in providing a native file download feature, demands manual refresh for new file visibility, and has limited support for file formats outside of Delta and Parquet.
Key Insights:
Lakehouse distinguishes itself by being user-friendly and efficient in data uploading, albeit with slower previews. Its distinction from a Storage Account lies in these unique functionalities and user experience.
Fabric Pipeline vs. Data Pipeline: Navigating the Differences
Advantages of Fabric Pipeline:
Ease of Data Transformation: It introduces a low-code, no-code approach with the Power Query Editor, enriching the data transformation process.
Advanced Monitoring Capabilities: The ability to monitor pipelines and trace lineage enhances the management and integration of fabric artifacts.
Disadvantages:
Artifact and Trigger Limitations: A notable drawback is the isolated nature of each pipeline artifact and the limitation to a single scheduled trigger type per pipeline.
Expert Commentary:
Our analysis reveals that while both platforms share a user-friendly interface reminiscent of Azure’s, navigating between pipelines in Fabric requires additional steps. However, both platforms demonstrate rapid execution capabilities, with Azure slightly leading due to its unified pipeline management.
Notebooks vs. Databricks: An In-depth Comparison
Advantages of Notebooks:
Comprehensive Support and Integration: Notably, Notebooks excel in providing native support for various programming and visualization packages, coupled with a direct connection to Lakehouse.
Collaborative Features and Efficiency: The platform encourages collaboration through real-time co-editing and optimizes resource usage by stopping clusters when not in use.
Disadvantages:
Cluster and Resource Management: External management of clusters and the absence of a shared folder or user notebooks present challenges in collaborative environments.
Expert Insights:
Our discussion highlighted that Notebooks offers a superior user interface and connectivity options despite Databricks’ having certain advantages in data processing speeds.
Performance Showdown: Notebooks vs. Databricks
Our performance analysis underscored Fabric Notebooks’ superiority in handling large datasets and running machine learning models more efficiently than Databricks, especially highlighting Lakehouse’s faster cluster initiation times and data storage efficiencies.
Exploring Model and Experimentation: Fabric vs. Azure ML
Advantages and Insights:
Seamless Integration and Configuration: Fabric’s integration with Lakehouse and direct pipeline connections streamline the data science workflow.
Graphical Interface and Focus: Fabric’s lack of a graphical interface contrasts with Azure ML’s user-friendly studio, indicating Fabric’s analytics and BI focus against Azure ML’s comprehensive experiment capabilities.
Performance Analysis:
Our comparative performance review revealed that Fabric excels in dataset loading and model execution speeds, offering significant advantages over Azure ML.
Closing Thoughts from the Summit
Our Global PowerBI Summit session aimed to demystify the complexities of modern data technologies, providing attendees with clear, actionable insights. Our collaborative presentation underscored the importance of understanding each technology’s strengths and limitations, empowering data professionals to make informed decisions in their projects. The dynamic interplay between these technologies illustrates the vibrant and evolving nature of the data landscape, promising exciting possibilities for innovation and efficiency in data management and analysis.
These stats were taken during the early release of the product. However, there is a continuous improvement of this product. Hence, we need to revisit this after a period of some time.
Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.
The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –
FinnHub:For more information, please click the following link.
Ably: For more information, please click the following link.
H2O-Wave: For more information, please click the following link.
I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –
In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –
Real-time trade dashboard
Let us explore the architecture of this implementation –
Architecture Diagram
This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.
For this use case, you need to install the following packages –
STEP – 1:
Main Packages
STEP – 2:
Main Packages – Continue
STEP – 3:
Main Packages – Continue
STEP – 4:
Main Packages – End
You can copy the following commands to install the above-mentioned packages –
Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –
FinnHub Portal
We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.
1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Let’s explore the key snippets from the above script –
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
# Getting block count
#df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
#l.logr('3. block_df.csv', 'Y', df_conv, subdir)
# Getting block count
#df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
#df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
#df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
#df_conv_fin = df_conv.value_counts(['default_rank'])
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –
a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1
Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.
In the above snippet, the application will consume the streaming data from the Ably queue.
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[-1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row - start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', -15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[-1] = [cat, val]
page.save()
The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.
2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
As we already discussed, we’ll pass a default set of data for all the candidate companies.
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
Send the company-specific trade queries through websocket apps to submit that to FinnHub.
3. clsConfig.py ( This file contains the configuration details. )
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.
Examining Source Data.
No SQL Data from Cosmos:
Let’s check one more time the No SQL data created in our last post.
Total, we’ve created 6 records in our last post.
As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.
Azure SQL DB:
Let’s create some data in Azure SQL DB.
But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.
I won’t discuss the detailed steps of creating DB here.
From Azure portal, it looks like –
Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.
Let’s create the table & some sample data aligned as per our cosmos data.
We will join both the data based on subscriberId & then extract our required columns in our final output.
Good. Now, we’re ready for python scripts.
Python Scripts:
In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –
clsL.py
clsColMgmt.py
clsCosmosDBDet.py
So, I’m not going to discuss these scripts.
Before we discuss our scripts, let’s look out the directory structures –
Here is the detailed directory structure between the Windows & MAC O/S.
1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.
3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### Modified On 02-Jun-2019 ####
#### ####
#### Objective: Main calling scripts. ####
##############################################
import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def getDate(row):
try:
d1 = row['orderDate']
d1_str = str(d1)
d1_dt_part, sec = d1_str.split('.')
dt_part1 = d1_dt_part.replace('T', ' ')
return dt_part1
except Exception as e:
x = str(e)
print(x)
dt_part1 = ''
return dt_part1
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
df_ret = p.DataFrame()
df_ret_2 = p.DataFrame()
df_ret_2_Mod = p.DataFrame()
debug_ind = 'Y'
# Initiating Log Class
l = cl.clsLog()
general_log_path = str(cf.config['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)
# Moving previous day log files to archive directory
arch_dir = cf.config['ARCH_DIR']
log_dir = cf.config['LOG_PATH']
print("Archive Directory:: ", arch_dir)
print("Log Directory::", log_dir)
print("*" * 157)
print("Testing COSMOS DB Connection!")
print("*" * 157)
# Checking Cosmos DB Azure
y = cmdb.clsCosmosDBDet()
ret_val = y.test_db_con()
if ret_val == 0:
print()
print("Cosmos DB Connection Successful!")
print("*" * 157)
else:
print()
print("Cosmos DB Connection Failure!")
print("*" * 157)
raise Exception
print("*" * 157)
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])
print("Azure SQL DB::")
print(act_df)
print()
print("-" * 157)
# Calling the function 1
print("RealtimeEmail::")
# Fetching First collection data to dataframe
print("Fethcing Comos Collection Data!")
sql_qry_1 = cf.config['SQL_QRY_1']
msg = "Documents generatd based on unique key"
collection_flg = 1
x = cm.clsColMgmt()
df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)
l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('RealtimeEmail Data::')
print(df_ret)
print()
# Checking execution status
ret_val = int(df_ret.shape[0])
if ret_val == 0:
print("Cosmos DB Hans't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfully fetched!")
print("*" * 157)
# Calling the 2nd Collection
print("RealtimeTwitterFeedback::")
# Fetching First collection data to dataframe
print("Fethcing Cosmos Collection Data!")
# Query using parameters
sql_qry_2 = cf.config['SQL_QRY_2']
msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
collection_flg = 2
val = 'crazyGo'
param_det = [{"name": "@CrVal", "value": val}]
add_param = 2
x1 = cm.clsColMgmt()
df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)
l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('Realtime Twitter Data:: ')
print(df_ret_2)
print()
# Checking execution status
ret_val_2 = int(df_ret_2.shape[0])
if ret_val_2 == 0:
print("Cosmos DB hasn't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfuly row feteched!")
print("*" * 157)
# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')
df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]
print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
print(df_fin)
l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')
# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)
# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)
print("*" * 157)
print()
print("Final Combined & Transformed result:: ")
print(df_fin)
l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
print("*" * 157)
except ValueError:
print("No relevant data to proceed!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()
Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.
# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]
In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.
# Transforming the orderDate as per standard formatdf_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)# Dropping the old column & renaming the new column to old columndf_fin.drop(columns=['orderDate'], inplace=True)df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)
In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.
Let’s see the directory structure of our program –
Let’s see how it looks when it runs –
Windows:
MAC:
So, finally, we’ve successfully blended the data & make more meaningful data projection.
Following python packages are required to run this application –
pip install azure
pip install azure-cosmos
pip install pandas
pip install requests
pip install pyodbc
This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.
I hope you’ll like this effort.
Wait for the next installment. Till then, Happy Avenging. 😀
[Note: All the sample data are available/prepared in the public domain for research & study.]
Here is the latest installment from the Python verse. For the first time, we’ll be dealing with Python with Azure cloud along with the help from Pandas & json.
Why post on this topic?
I always try to post something based on some kind of used cases, which might be useful in real-life scenarios. And, on top of that, I really don’t find significant posts on Azure dealing with Python. So, thought of sharing some first used cases, which will encourage others to join this club & used more python based application in the Azure platform.
First, let us check the complexity of today’s post & our objective.
What is the objective?
Today, our objective is to load a couple of json payload & stored them into multiple Cosmos Containers & finally fetch the data from the Cosmos DB & store the output into our log files apart from printing the same over the terminal screen.
Before we start discussing our post, let us explain some basic terminology of Azure Cosmos DB. So, that, next time whenever we refer them, it will be easier for you to understand those terminologies.
Learning basic azure terminology.
Since this is an unstructured DB, all the data will be stored in this following fashion –
Azure Cosmos DB -> Container -> Items
Let’s simplify this in words. So, each azure DB may have multiple containers, which you can compare with the table of any conventional RDBMS. And, under containers, you will have multiple items, which represents rows of an RDBMS table. The only difference is in each item you might have a different number of elements, which is equivalent to the columns in traditional RDBMS tables. The traditional table always has a fixed number of columns.
Input Payload:
Let’s review three different payloads, which we’ll be loading into three separate containers.
srcEmail.json
As you can see in the items, first sub-row has 3 elements, whereas the second one has 4 components. Traditional RDBMS, the table will always have the same number of columns.
srcTwitter.json
srcHR.json
So, from the above three sample payload, our application will try to put user’s feedback & consolidate at a single place for better product forecasts.
Azure Portal:
Let’s look into the Azure portal & we’ll be identifying a couple of crucial information, which will require in python scripts for authentication. But, before that, I’ll show – how to get those details in steps –
As shown highlighted in Red, click the Azure Cosmos DB. You will find the following screen –
If you click this, you will find all the collections/containers that are part of the same DB as follows –
After, that we’ll be trying to extract the COSMOS Key & the Endpoint/URI from the portal. Without this, python application won’t be able to interact with the Azure portal. This is sensitive information. So, I’ll be providing some dummy details here just to show how to extract it. Never share these details with anyone outside of your project or group.
Good. Now, we’re ready for python scripts.
Python Scripts:
In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –
clsL.py
So, I’m not going to discuss these scripts.
Before we discuss our scripts, let’s look out the directory structures –
1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### azure cosmos db. Application will ####
#### process these information & perform ####
#### various CRUD operation on Cosmos DB. ####
##############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
db_name = 'rnd-de01-usw2-vfa-cdb'
db_link = 'dbs/' + db_name
CONTAINER1 = "RealtimeEmail"
CONTAINER2 = "RealtimeTwitterFeedback"
CONTAINER3 = "RealtimeHR"
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'COSMOSDB': db_name,
'COSMOS_CONTAINER1': CONTAINER1,
'COSMOS_CONTAINER2': CONTAINER2,
'COSMOS_CONTAINER3': CONTAINER3,
'CONFIG_ORIG': 'Config_orig.csv',
'ENCRYPT_CSV': 'Encrypt_Config.csv',
'DECRYPT_CSV': 'Decrypt_Config.csv',
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'APP_DESC_1': 'Feedback Communication',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty FROM RealtimeEmail c",
'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
'DB_QRY': "SELECT * FROM c",
'COLLECTION_QRY': "SELECT * FROM r",
'database_link': db_link,
'collection_link_1': db_link + '/colls/' + CONTAINER1,
'collection_link_2': db_link + '/colls/' + CONTAINER2,
'collection_link_3': db_link + '/colls/' + CONTAINER3,
'options': {
'offerThroughput': 1000,
'enableCrossPartitionQuery': True,
'maxItemCount': 2
}
}
2. clsCosmosDBDet (This script will test the necessary connection with the Azure cosmos DB from the python application. And, if it is successful, then it will fetch all the collection/containers details, which resided under the same DB. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This script will check & ####
#### test the connection with the Cosmos ####
#### & it will fetch all the collection ####
#### name resied under the same DB. ####
##############################################
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
from clsConfig import clsConfig as cf
class IDisposable(cosmos_client.CosmosClient):
def __init__(self, obj):
self.obj = obj
def __enter__(self):
return self.obj
def __exit__(self, exception_type, exception_val, trace):
self = None
class clsCosmosDBDet:
def __init__(self):
self.endpoint = cf.config['COSMOSDB_ENDPOINT']
self.primarykey = cf.config['COSMOS_PRIMARYKEY']
self.db = cf.config['COSMOSDB']
self.cont_1 = cf.config['COSMOS_CONTAINER1']
self.cont_2 = cf.config['COSMOS_CONTAINER2']
self.cont_3 = cf.config['COSMOS_CONTAINER3']
self.database_link = cf.config['database_link']
self.collection_link_1 = cf.config['collection_link_1']
self.collection_link_2 = cf.config['collection_link_2']
self.collection_link_3 = cf.config['collection_link_3']
self.options = cf.config['options']
self.db_qry = cf.config['DB_QRY']
self.collection_qry = cf.config['COLLECTION_QRY']
def list_Containers(self, client):
try:
database_link = self.database_link
collection_qry = self.collection_qry
print("1. Query for collection!")
print()
collections = list(client.QueryContainers(database_link, {"query": collection_qry}))
if not collections:
return
for collection in collections:
print(collection['id'])
print()
except errors.HTTPFailure as e:
if e.status_code == 404:
print("*" * 157)
print('A collection with id \'{0}\' does not exist'.format(id))
print("*" * 157)
else:
raise errors.HTTPFailure(e.status_code)
def test_db_con(self):
endpoint = self.endpoint
primarykey = self.primarykey
options_1 = self.options
db_qry = self.db_qry
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
try:
options = {}
query = {"query": db_qry}
options = options_1
print("-" * 157)
print('Options:: ', options)
print()
print("Database details:: ")
result_iterable = client.QueryDatabases(query, options)
for item in iter(result_iterable):
print(item)
print("-" * 157)
except errors.HTTPFailure as e:
if e.status_code == 409:
pass
else:
raise errors.HTTPFailure(e.status_code)
self.list_Containers(client)
return 0
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.message))
return 1
finally:
print("Application successfully completed!")
Key lines from the above scripts are –
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
In this step, the python application is building the connection object.
# Refer the entry in our config file
self.db_qry = cf.config['DB_QRY']..query = {"query": db_qry}options = options_1..
Based on the supplied value from our configuration python script, this will extract the cosmos DB information.
self.list_Containers(client)
This is a function that will identify all the collection under this DB.
def list_Containers(self, client):
..
collections = list(client.QueryContainers(database_link, {"query": collection_qry}))if not collections: returnfor collection in collections: print(collection['id'])
In these above lines, our application will actually fetch the containers that are associated with this DB.
3. clsColMgmt.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
################################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: This scripts has multiple ####
#### features. You can create new items ####
#### in azure cosmos db. Apart from that ####
#### you can retrieve data from Cosmos just ####
#### for viewing purpose. You can display ####
#### data based on specific filters or the ####
#### entire dataset. Hence, three different ####
#### methods provided here to support this. ####
################################################
import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import pandas as p
import json
from clsConfig import clsConfig as cf
class IDisposable(cosmos_client.CosmosClient):
def __init__(self, obj):
self.obj = obj
def __enter__(self):
return self.obj
def __exit__(self, exception_type, exception_val, trace):
self = None
class clsColMgmt:
def __init__(self):
self.endpoint = cf.config['COSMOSDB_ENDPOINT']
self.primarykey = cf.config['COSMOS_PRIMARYKEY']
self.db = cf.config['COSMOSDB']
self.cont_1 = cf.config['COSMOS_CONTAINER1']
self.cont_2 = cf.config['COSMOS_CONTAINER2']
self.cont_3 = cf.config['COSMOS_CONTAINER3']
self.database_link = cf.config['database_link']
self.collection_link_1 = cf.config['collection_link_1']
self.collection_link_2 = cf.config['collection_link_2']
self.collection_link_3 = cf.config['collection_link_3']
self.options = cf.config['options']
self.db_qry = cf.config['DB_QRY']
self.collection_qry = cf.config['COLLECTION_QRY']
# Creating cosmos items in container
def CreateDocuments(self, inputJson, collection_flg = 1):
try:
# Declaring variable
endpoint = self.endpoint
primarykey = self.primarykey
print('Creating Documents')
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
if collection_flg == 1:
collection_link = self.collection_link_1
elif collection_flg == 2:
collection_link = self.collection_link_2
else:
collection_link = self.collection_link_3
container = client.ReadContainer(collection_link)
# Create a SalesOrder object. This object has nested properties and various types including numbers, DateTimes and strings.
# This can be saved as JSON as is without converting into rows/columns.
print('Input Json:: ', str(inputJson))
nSon = json.dumps(inputJson)
json_rec = json.loads(nSon)
client.CreateItem(container['_self'], json_rec)
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.status_code))
finally:
print("Application successfully completed!")
return 0
except Exception as e:
x = str(e)
print(x)
return 1
def CosmosDBCustomQuery_PandasCSVWithParam(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
try:
# Reading data by SQL & convert it ot Pandas Dataframe
results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
cnt = 0
dfSrc = p.DataFrame()
dfRes = p.DataFrame()
dfSrc2 = p.DataFrame()
json_data = ''
for doc in results:
cnt += 1
dfSrc = p.io.json.json_normalize(results)
dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
dfRes = dfSrc
print("Total records fetched: ", cnt)
print("*" * 157)
return dfRes
except errors.HTTPFailure as e:
Df_Fin = p.DataFrame()
if e.status_code == 404:
print("*" *157)
print("Document doesn't exists")
print("*" *157)
return Df_Fin
elif e.status_code == 400:
print("*" * 157)
print("Bad request exception occuered: ", e)
print("*" *157)
return Df_Fin
else:
return Df_Fin
finally:
print()
def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
try:
# Reading data by SQL & convert it ot Pandas Dataframe
results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
cnt = 0
dfSrc = p.DataFrame()
dfRes = p.DataFrame()
dfSrc2 = p.DataFrame()
json_data = ''
for doc in results:
cnt += 1
dfSrc = p.io.json.json_normalize(results)
dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
dfRes = dfSrc
print("Total records fetched: ", cnt)
print("*" * 157)
return dfRes
except errors.HTTPFailure as e:
Df_Fin = p.DataFrame()
if e.status_code == 404:
print("*" *157)
print("Document doesn't exists")
print("*" *157)
return Df_Fin
elif e.status_code == 400:
print("*" * 157)
print("Bad request exception occuered: ", e)
print("*" *157)
return Df_Fin
else:
return Df_Fin
finally:
print()
def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]):
endpoint = self.endpoint
primarykey = self.primarykey
options_1 = self.options
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
try:
if collection_flg == 1:
collection_link = self.collection_link_1
elif collection_flg == 2:
collection_link = self.collection_link_2
else:
collection_link = self.collection_link_3
print("Additional parameters: ", additional_params)
message = msg
options = options_1
if additional_params == 1:
query = {"query": sql_qry}
df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options)
else:
query = {"query": sql_qry, "parameters": param_det}
df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options)
return df_Fin
except errors.HTTPFailure as e:
print("Application has caught an error. {0}".format(e.message))
finally:
print("Application successfully completed!")
Key lines from the above script –
def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
This method is generic. It will fetch all the records of a cosmos container.
In this step, the application fetching the data in the form of json & then serialize them & flatten them & finally stored the result into pandas dataframe for return output. Function –
CosmosDBCustomQuery_PandasCSVWithParam
– Is the same as the previous function. The only thing it can process parameters to filter out the data.
Once, you’ll receive the input payload. The application will convert it to valid JSON payload & then send it to create item method to insert records.
4. callCosmosAPI.py (This script is the main calling function. Hence, the name comes into the picture.)
##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-May-2019 ####
#### ####
#### Objective: Main calling scripts. ####
##############################################
import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
def main():
try:
df_ret = p.DataFrame()
df_ret_2 = p.DataFrame()
df_ret_2_Mod = p.DataFrame()
debug_ind = 'Y'
# Initiating Log Class
l = cl.clsL()
general_log_path = str(cf.config['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)
# Moving previous day log files to archive directory
arch_dir = cf.config['ARCH_DIR']
log_dir = cf.config['LOG_PATH']
print("Archive Directory:: ", arch_dir)
print("Log Directory::", log_dir)
print("*" * 157)
print("Testing COSMOS DB Connection!")
print("*" * 157)
# Checking Cosmos DB Azure
y = cmdb.clsCosmosDBDet()
ret_val = y.test_db_con()
if ret_val == 0:
print()
print("Cosmos DB Connection Successful!")
print("*" * 157)
else:
print()
print("Cosmos DB Connection Failure!")
print("*" * 157)
raise Exception
print("*" * 157)
# Creating Data in Cosmos DB
print()
print('Fetching data from Json!')
print('Creating data for Email..')
print("-" * 157)
emailFile = cf.config['EMAIL_SRC_JSON_FILE']
flg = 1
with open(emailFile) as json_file:
dataEmail = json.load(json_file)
# Creating documents
a1 = cm.clsColMgmt()
ret_cr_val1 = a1.CreateDocuments(dataEmail, flg)
if ret_cr_val1 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
print()
print('Creating data for Twitter..')
print("-" * 157)
twitFile = cf.config['TWITTER_SRC_JSON_FILE']
flg = 2
with open(twitFile) as json_file:
dataTwitter = json.load(json_file)
# Creating documents
a2 = cm.clsColMgmt()
ret_cr_val2 = a2.CreateDocuments(dataTwitter, flg)
if ret_cr_val2 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
print()
print('Creating data for HR..')
print("-" * 157)
hrFile = cf.config['HR_SRC_JSON_FILE']
flg = 3
with open(hrFile) as json_file:
hrTwitter = json.load(json_file)
# Creating documents
a3 = cm.clsColMgmt()
ret_cr_val3 = a3.CreateDocuments(hrTwitter, flg)
if ret_cr_val3 == 0:
print('Successful data creation!')
else:
print('Failed create data!')
print("-" * 157)
# Calling the function 1
print("RealtimeEmail::")
# Fetching First collection data to dataframe
print("Fethcing Comos Collection Data!")
sql_qry_1 = cf.config['SQL_QRY_1']
msg = "Documents generatd based on unique key"
collection_flg = 1
x = cm.clsColMgmt()
df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)
l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('RealtimeEmail Data::')
print(df_ret)
print()
# Checking execution status
ret_val = int(df_ret.shape[0])
if ret_val == 0:
print("Cosmos DB Hans't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfully fetched!")
print("*" * 157)
# Calling the 2nd Collection
print("RealtimeTwitterFeedback::")
# Fetching First collection data to dataframe
print("Fethcing Cosmos Collection Data!")
# Query using parameters
sql_qry_2 = cf.config['SQL_QRY_2']
msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
collection_flg = 2
val = 'crazyGo'
param_det = [{"name": "@CrVal", "value": val}]
add_param = 2
x1 = cm.clsColMgmt()
df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)
l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
print('Realtime Twitter Data:: ')
print(df_ret_2)
print()
# Checking execution status
ret_val_2 = int(df_ret_2.shape[0])
if ret_val_2 == 0:
print("Cosmos DB hasn't returned any rows. Please check your queries!")
print("*" * 157)
else:
print("Successfuly row feteched!")
print("*" * 157)
except ValueError:
print("No relevant data to proceed!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()
Key lines from the above script –
with open(twitFile) as json_file: dataTwitter = json.load(json_file)
Reading a json file.
val = 'crazyGo'param_det = [{"name": "@CrVal", "value": val}]add_param = 2
Passing a specific parameter value to filter out the record, while fetching it from the Cosmos DB.
Now, let’s look at the runtime stats.
Windows:
MAC:
Let’s compare the output log directory –
Windows:
MAC:
Let’s verify the data from Cosmos DB.
Here, subscriberId starting with ‘M‘ denotes data inserted from the MAC environment. Other one inserted through Windows.
Let’s see one more example from Cosmos –
So, I guess – we’ve achieved our final goal here. Successfully, inserted data into Azure Cosmos DB from the python application & retrieve it successfully.
Following python packages are required in order to run this application –
pip install azure
pip install azure-cosmos
pip install pandas
pip install requests
This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.
I hope you’ll like this effort.
Wait for the next installment. Till then, Happy Avenging. 😀
[Note: All the sample data are available/prepared in the public domain for research & study.]
Today, we’ll demonstrate the different application of Pandas. In this case, we’ll be exploring the possibilities of reading large CSV files & splitting it sets of smaller more manageable csv to read.
And, after creating it, another process will merge them together. This is especially very useful when you need transformation on a large volume of data without going for any kind of memory error. And, moreover, the developer has more control over failed cases & can resume the load without restarting it from the beginning of the files.
In this case, I’ll be using one more custom methods to create the csv file instead of directly using the to_csv method of pandas.
But, before that let’s prepare the virtual environment & proceed from there –
Windows 10 (64 bit):
Commands:
python -m venv –copies .env
.env\Scripts\activate.bat
Screenshot:
Mac OS (64 bit):
Commands:
python -m venv env
source env/bin/activate
Screenshot:
So, both the Windows & Mac version is 3.7 & we’re going to explore our task in the given section.
After creating this virtual environment, you need to install only pandas package for this task as shown below for both the Windows or Mac OS –
Windows:
Mac:
Rests are the packages comes as default with the Python 3.7.
Please find the GUI screenshots from WinSCP software comparing both the directory structures (Mac & Windows) as given below –
From the above screenshot, you can see that our directory structure are not exactly identical before the blog directory. However, our program will take care of this difference.
Let’s check the scripts one-by-one,
1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.)
################################################# Written By: Satyaki De #################################################importpandasaspimportosimportplatformasplclassclsL(object):
def__init__(self):
self.path = os.path.dirname(os.path.realpath(__file__))
deflogr(self, Filename, Ind, df, subdir=None):
try:
x = p.DataFrame()
x = df
sd = subdir
os_det = pl.system()
if os_det =="Windows":
if sd ==None:
fullFileName =self.path +"\\"+ Filename
else:
fullFileName =self.path +"\\"+ sd +"\\"+ Filename
else:
if sd ==None:
fullFileName =self.path +"/"+ Filename
else:
fullFileName =self.path +"/"+ sd +"/"+ Filename
if Ind =='Y':
x.to_csv(fullFileName, index=False)
return0exceptExceptionas e:
y =str(e)
print(y)
return3
From the above script, you can see that based on the Indicator, whose value can be either ‘Y’ or ‘N’. It will generate the csv file from the pandas data frame using to_csv method available in pandas.
Here, the class is creating an instance & during that time it is initializing the value of the current path from where the application is triggering.
x = p.DataFrame() x = df
The first line, declaring a pandas data frame variable. The second line assigns the value from the supplied method to that variable.
os_det = pl.system()
This will identify the operating system on which your application is running. Based on that, your path will be dynamically configured & passed. Hence, your application will be ready to handle multiple operating systems since beginning.
x.to_csv(fullFileName, index=False)
Finally, to_csv will generate the final csv file based on the supplied Indicator value. Also, notice that we’ve added one more parameter (index=False). By default, pandas create one extra column known as an index & maintain it’s operation based on that.
As you can see that the first column is not coming from our source files. Rather, it is generated by the pandas package in python. Hence, we don’t want to capture that in our final file by mentioning (index=False) options.
2. clsSplitFl.py (This script will create the split csv files. This will bring chunk by chunk data into your memory & process the large files.)
################################################# Written By: Satyaki De #################################################importosimportpandasaspimportclsLogaslogimportgcimportcsvclassclsSplitFl(object):
def__init__(self, srcFileName, path, subdir):
self.srcFileName = srcFileName
self.path = path
self.subdir = subdir
# Maximum Number of rows in CSV# in order to avoid Memory Errorself.max_num_rows =30000self.networked_directory ='src_file'self.Ind ='Y'defsplit_files(self):
try:
src_dir =self.path
subdir =self.subdir
networked_directory =self.networked_directory
# Initiate Logging Instances
clog = log.clsLog()
# Setting up values
srcFileName =self.srcFileName
First_part, Last_part =str(srcFileName).split(".")
num_rows =self.max_num_rows
dest_path =self.path
remote_src_path = src_dir + networked_directory
Ind =self.Ind
interval = num_rows
# Changing work directory location to source file# directory at remote server
os.chdir(remote_src_path)
src_fil_itr_no =1# Split logic herefor df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):
# Changing the target directory path
os.chdir(dest_path)
# Calling custom file generation method# to generate splitted files
clog.logr(str(src_fil_itr_no) +'__'+ First_part +'_'+'_splitted_.'+ Last_part, Ind, df2, subdir)
del [[df2]]
gc.collect()
src_fil_itr_no +=1return0exceptExceptionas e:
x =str(e)
print(x)
return1
In this script, we’re splitting the file if that file has more than 30,000 records. And, based on that it will split a number of equal or fewer volume files.
Important lines to be noticed –
self.max_num_rows = 30000
As already explained, based on this the split files contain the maximum number of rows in each file.
This will split the source file name into the first part & second part i.e. one part contains only the file name & the other part contains only the extension dynamically.
for df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):
As you can see, the chunk-by-chunk (mentioned as chunksize=interval) application will read lines from the large source csv. And, if it has any bad rows in the source files – it will skip them due to the following condition -> (error_bad_lines=False).
Dynamically generating split files in the specific subdirectory along with the modified name. So, these files won’t get overwritten – if you rerun it. Remember that the src_fil_itr_no will play an important role while merging them back to one as this is a number representing the current file’s split number.
del [[df2]] gc.collect()
Once, you process that part – delete the data frame & deallocate the memory. So, that you won’t encounter any memory error or a similar issue.
And, the split file will look like this –
3. clsMergeFl.py (This script will add together all the split csv files into one big csv file. This will bring chunk by chunk data into your memory & generates the large file.)
################################################# Written By: Satyaki De #################################################importosimportplatformasplimportpandasaspimportgcimportclsLogaslogimportreclassclsMergeFl(object):
def__init__(self, srcFilename):
self.srcFilename = srcFilename
self.subdir ='finished'self.Ind ='Y'defmerge_file(self):
try:
# Initiating Logging Instances
clog = log.clsLog()
df_W = p.DataFrame()
df_M = p.DataFrame()
f = {}
subdir =self.subdir
srcFilename =self.srcFilename
Ind =self.Ind
cnt =0
os_det = pl.system()
if os_det =="Windows":
proc_dir ="\\temp\\"
gen_dir ="\\process\\"else:
proc_dir ="/temp/"
gen_dir ="/process/"# Current Directory where application presents
path = os.path.dirname(os.path.realpath(__file__)) + proc_dir
print("Path: ", path)
print("Source File Initial Name: ", srcFilename)
for fname in os.listdir(path):
if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'):
key =int(re.split('__', str(fname))[0])
f[key] =str(fname)
for k insorted(f):
print(k)
print(f[k])
print("-"*30)
df_W = p.read_csv(path+f[k], index_col=False)
if cnt ==0:
df_M = df_W
else:
d_frames = [df_M, df_W]
df_M = p.concat(d_frames)
cnt +=1print("-"*30)
print("Total Records in this Iteration: ", df_M.shape[0])
FtgtFileName = fname.replace('_splitted_', '')
first, FinalFileName = re.split("__", FtgtFileName)
clog.logr(FinalFileName, Ind, df_M, gen_dir)
del [[df_W], [df_M]]
gc.collect()
return0exceptExceptionas e:
x =str(e)
print(x)
return1
In this script, we’re merging smaller files into a large file. Following are the key snippet that we’ll explore –
for fname in os.listdir(path): if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'): key = int(re.split('__', str(fname))[0]) f[key] = str(fname)
In this section, the application will check if in that specified path we’ve files whose extension ends with “_splitted_.csv” & their first name starts with the file name initial i.e. if you have a source file named – acct_addr_20180112.csv, then it will check the first name should start with the -> “acct_addr” & last part should contain “_splitted_.csv”. If it is available, then it will start the merge process by considering one by one file & merging them using pandas data frame (marked in purple color) as shown below –
for k in sorted(f): print(k) print(f[k]) print("-"*30) df_W = p.read_csv(f[k], index_col=False) if cnt == 0: df_M = df_W else: d_frames = [df_M, df_W] df_M = p.concat(d_frames) cnt += 1
Note that, here f is a dictionary that contains filename in key, value pair. The first part of the split file contains the number. That way, it would be easier for the merge to club them back to one large file without thinking of orders.
Here, also notice the special function concat provided by the pandas. In this step, applications are merging two data frames.
Finally, the main python script, from where we’ll call it –
################################################# Written By: Satyaki De #################################################importclsSplitFlastimportclsMergeFlascmimportreimportplatformasplimportosdefmain():
print("Calling the custom Package for large file splitting..")
os_det = pl.system()
print("Running on :", os_det)
##################################################################### User Input based on Windows OS #######################################################################
srcF =str(input("Please enter the file name with extension:"))
base_name = re.sub(r'[0-9]','', srcF)
srcFileInit = base_name[:-5]
if os_det =="Windows":
subdir ="\\temp\\"
path = os.path.dirname(os.path.realpath(__file__)) +"\\"else:
subdir ="/temp/"
path = os.path.dirname(os.path.realpath(__file__)) +'/'##################################################################### End Of User Input #####################################################################
x = t.clsSplitFl(srcF, path, subdir)
ret_val = x.split_files()
if ret_val ==0:
print("Splitting Successful!")
else:
print("Splitting Failure!")
print("-"*30)
print("Finally, Merging small splitted files to make the same big file!")
y = cm.clsMergeFl(srcFileInit)
ret_val1 = y.merge_file()
if ret_val1 ==0:
print("Merge Successful!")
else:
print("Merge Failure!")
print("-"*30)
if __name__ =="__main__":
main()
Following are the key section that we can check –
import clsSplitFl as t import clsMergeFl as cm
Like any other standard python package, we’re importing our own class into our main callable script.
x = t.clsSplitFl(srcF, path, subdir)ret_val = x.split_files()
Or,
y = cm.clsMergeFl(srcFileInit)ret_val1 = y.merge_file()
In this section, we’ve instantiated the class & then we’re calling its function. And, based on the return value – we’re printing the status of our application last run.
The final run of this application looks like ->
Windows:
Mac:
And, the final file should look like this –
Windows:
MAC:
Left-hand side representing windows final processed/output file, whereas right-hand side representing MAC final processed/output file.
Hope, this will give you some idea about how we can use pandas in various cases apart from conventional data computing.
In this post, I skipped the exception part intentionally. I’ll post one bonus post once my series complete.
I’ve been working for more than 8 years in Oracle 10g, 11g & worked significant queries on Regular expressions in various scenario using SQL. It is real handy if you know how to use it & can reduce lots of pain with single SQL. And, the performance will be better compared to the total effort to achieve the same functionalists by using multiple SQL queries or PL/SQL Procedures. Last couple of years, I’m working on Teradata. And, on some occasion – I was expecting features like these, where I can easily manipulate data with regular expression. I’m pretty excited when I heard that Teradata also introduced Regular Expression from Version 14.0.
As a result, I tried all those features that I think can be handy & useful for various scenarios & followings are the successful queries that I get. There are two occasion, where Teradata partially able to manipulate those strings. I’ve checked the latest Teradata Manual. However, unable to find those solution. So, I’m expecting other forum members can contribute here in order to make this thread useful for every one of us. And, I’ll post here as soon as I get some answers on these partial conversions.
For better understanding, I’ve provided the actual column value & after transformation value of that column in the output. That will help us to grasp it easily – I guess. 🙂 Case 1,
1 2 3 4 5
SELECT regexp_replace('SatyakiDe','([[:lower:]]{1,})([[:upper:]]{1,})','\1 \2') AS COL_VAL;
COLA COL_VAL ---------------- ---------------------------------------- SatyakiDe Satyaki De
COLA COL_VAL -------------------------------- -------------------------------------------------- satyaki.de@mail.com Satyaki De
Case 5,
1 2 3 4 5
select regexp_replace('100011001','([[:digit:]]{3})([[:digit:]]{2})([[:digit:]]{4})','XXX-XX-\3') as COL_VAL;
COLA COL_VAL ---------------- -------------------- 100011001 XXX-XX-1001
Case 6,
1 2 3 4 5
select regexp_replace('123456789','([[:digit:]]{3})([[:digit:]]{3})([[:digit:]]{3})','\3.\2.\1') as COL_VAL;
COLA COL_VAL --------- --------------- 123456789789.456.123
Case 7,
1 2 3 4 5
SELECT regexp_replace('satyaki9de0loves3to8work2on2sql0and2bi6tools1','[^0-9]+','',1,0,'i') AS DER_VAL;
COLA DER_VAL --------------------------------------------- ---------- satyaki1de0loves3to8work2on2sql0and2bi4tools1 1038220241
As you can see, all the characters have filtered out from the string & only numbers are kept here. These sorts of queries are very useful in lots of different business scenarios as well. So, any extra space may not produce desired result. And, needs to pay attention into these small details.
And, I’ve tested all these queries in the following two versions –
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
select*from dbcinfo;
InfoKey InfoData -------- ------------------------ 1VERSION14.10.00.02 2 RELEASE 14.10.00.02 3LANGUAGE SUPPORT MODE Standard
select*from dbcinfo;
InfoKey InfoData -------- ------------------------ 1VERSION14.10.01.05 2 RELEASE 14.10.01.04 3LANGUAGE SUPPORT MODE Standard
Hope, this will give you much more clarity. 🙂
One more thing, I would like to clarify here – my intention is to describe more features about these regexp_(similar/substr/instr/replace) functions. I’ve received one question whether these regexp functions available in TD 13 or not in Teradata forum while posting the same article over there. And, here is my answer to that question –
Regarding version 13,
Let us check whether they have these regexp functions or not –
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
select*from dbcinfo;
InfoKey InfoData -------- ------------------------ 1VERSION13.00.00.15 2 RELEASE 13.00.00.15 3LANGUAGE SUPPORT MODE Standard
select*from dbcinfo;
InfoKey InfoData -------- ------------------------ 1VERSION13.10.07.12 2 RELEASE 13.10.07.12 3LANGUAGE SUPPORT MODE Standard
1 2 3 4 5 6 7 8 9
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL; $ *** Failure 3706 Syntax error: expected something between'('and the string 'S' keyword. Statement#1, Info =35 *** Total elapsed time was 1second.
Hope this will give adequate clarity to the answer of that above question. Now, Lets see some other functionality.
REGEXP_SIMILAR has similar functionality like REGEXP_LIKE in Oracle.
SELECT* FROM TEST_T1 WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=1;
COL1 ----- 456
Case 3 (Returns All Numbers including Positive, Negative & unsigned),
1 2 3 4 5 6 7 8 9 10 11
SELECT* FROM TEST_T1 WHERE REGEXP_SIMILAR(COL1,'^[+-]?[0-9]+[+-]?$','c')=1;
COL1 ----- 456 +789 -789 159- -1-
Case 4 (Returns Only Non Numbers i.e. Characters),
1 2 3 4 5 6 7
SELECT* FROM TEST_T1 WHERE REGEXP_SIMILAR(COL1,'[^0-9]+','c')=1;
COL1 ---- y
Hope this will give you some additional idea. 🙂
My objective is to provide basic information to my friends. So, that they can write better SQL in TD while migrating from other popular databases or new developer in TD can get a flavor of this powerful feature & exploit them in all the positive aspect & apply them properly. 😀
You must be logged in to post a comment.