Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = 1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = 1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
def create_dashboard(update_freq=0.0):
page = site['/dashboard_finnhub']
general_log_path = str(cf.config['LOG_PATH'])
ably_id = str(cf.config['ABLY_ID'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO)
os_det = pl.system()
if os_det == "Windows":
src_path = path + '\\' + 'data\\'
else:
src_path = path + '/' + 'data/'
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
# Writing to the file
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir)
# Dropping unwanted columns
df_conv.drop(columns=['c'], axis=1, inplace=True)
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
# New Column List Orders
column_order = ['s', 'default_rank', 'p', 't', 'v']
df_unique_fin = df_unique.reindex(column_order, axis=1)
print('Rank DF Unique:')
print(df_unique_fin)
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir)
# Capturing transformed values into a DataFrame
# Depending on your logic, you'll implement that inside
# the process_DF functions
fin_df = process_DF(df_conv, df_unique_fin)
df_unq_fin = df_unique_fin.copy()
df_unq_fin.rename(columns={'s':'Company'}, inplace=True)
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True)
df_unq_fin.rename(columns={'v':'Change'}, inplace=True)
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True)
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir)
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True)
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir)
# Final clearance for better understanding of data
fin_df.drop(columns=['t'], axis=1, inplace=True)
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir)
count_row = df_unq_finale.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', 15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[1] = [cat, val]
page.save()
if __name__ == "__main__":
try:
# Main Calling script
create_dashboard(update_freq=0.25)
except Exception as e:
x = str(e)
print(x)

Let’s explore the key snippets from the above script –

def process_DF(inputDF, inputDFUnq):
    try:
        # Core Business logic
        # The application will show default value to any
        # trade-in stock in case that data doesn't consume
        # from the source.
        
        # Getting block count
        #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
        #l.logr('3. block_df.csv', 'Y', df_conv, subdir)

        # Getting block count
        #df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
        #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
        #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
        #df_conv_fin = df_conv.value_counts(['default_rank'])
        df_conv = inputDF
        df_unique_fin = inputDFUnq

        df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
        l.logr('3. max_df.csv', 'Y', df_conv, subdir)


        # Sorting the output
        sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)

        # New Column List Orders
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        df_fin = sorted_df.reindex(column_order, axis=1)

        l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)

        # Now splitting the sorted df into two sets
        lkp_max_count = 4
        df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]

        l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)

        df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
        l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)

        # Now to perform cross join, we will create
        # a key column in both the DataFrames to
        # merge on that key.
        df_unique_fin['key'] = 1
        df_fin_req['key'] = 1

        # Dropping unwanted columns
        df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
        l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)

        # Padding with dummy key values
        #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
        merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)

        l.logr('8. merge_df.csv', 'Y', merge_df, subdir)

        # Sorting the output
        sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)

        l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)

        # Calling new derived logic
        sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
        sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)

        l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Dropping unwanted columns
        sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)

        #Renaming the columns
        sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
        sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)

        l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Aligning columns
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)

        l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)

        # Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
        frames = [df_fin_na, merge_fin_df]
        fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])

        l.logr('13. fin_df.csv', 'Y', fin_df, subdir)

        # Final clearance & organization
        fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)

        l.logr('14. Final.csv', 'Y', fin_df, subdir)

        # Adjusting key columns
        fin_df.rename(columns={'s':'Company'}, inplace=True)
        fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
        fin_df.rename(columns={'v':'Change'}, inplace=True)

        l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)

        return fin_df
    except Exception as e:
        print('$' * 120)

        x = str(e)
        print(x)

        print('$' * 120)

        df = p.DataFrame()

        return df

The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –

a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1

Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.

def calc_p(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_p_val = float(row['p_y'])
        else:
            calc_p_val = float(row['p_x'])

        return calc_p_val
    except:
        return 0.0

def calc_v(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_v_val = float(row['v_y'])
        else:
            calc_v_val = float(row['v_x'])

        return calc_v_val
    except:
        return 0.0

The above snippet will capture the default values for those missing records.

    client = AblyRest(ably_id)
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

In the above snippet, the application will consume the streaming data from the Ably queue.

for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.

    # Converting dataframe to a desired Series
    f = CategoricalSeries(fin_df)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unq_finale.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Company'])

        ####################################################
        ##### Debug Purpose                        #########
        ####################################################
        print('Company: ', x_currency)
        print('J: ', str(j))
        print('Cat: ', cat)
        ####################################################
        #####   End Of Debug                         #######
        ####################################################

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            print('Update Cat: ', cat)
            print('Update Val: ', val)
            print('Update pc: ', pc)
            print('*' * 160)

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.


2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
# generate random floating point values
from random import seed
from random import random
# seed random number generator
import websocket
import json
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.config['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
def on_message(ws, message):
print("*" * 60)
res = json.loads(message)
jsBody = res["data"]
jdata_dyn = json.dumps(jsBody)
print(jdata_dyn)
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{
"c": "null",
"p": 0.01,
"s": "AAPL",
"t": 1624715406407,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "AMZN",
"t": 1624715406408,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "BINANCE:BTCUSDT",
"t": 1624715406409,
"v": 0.01
},
{
"c": "null",
"p": 0.01,
"s": "IC MARKETS:1",
"t": 1624715406410,
"v": 0.01
}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
jsBody = []
jdata_dyn = ''
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
# Invoking Individual Company Trade Queries
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()

The key snippet from the above script –

    json_data = [{
        "c": "null",
        "p": 0.01,
        "s": "AAPL",
        "t": 1624715406407,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "AMZN",
        "t": 1624715406408,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "BINANCE:BTCUSDT",
        "t": 1624715406409,
        "v": 0.01
    },
        {
        "c": "null",
        "p": 0.01,
        "s": "IC MARKETS:1",
        "t": 1624715406410,
        "v": 0.01
        }]

As we already discussed, we’ll pass a default set of data for all the candidate companies.

    # Publish a message to the sd_channel channel
    channel.publish('event', jdata)

    # Publish rest of the messages to the sd_channel channel
    channel.publish('event', jdata_dyn)

Publish the messages to the created channel.

def on_open(ws):
    # Invoking Individual Company Trade Queries
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

Send the company-specific trade queries through websocket apps to submit that to FinnHub.

3. clsConfig.py ( This file contains the configuration details. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning. Application will ####
#### process these information & perform ####
#### various analysis on Linear-Regression. ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'Data' + sep,
'APP_DESC_1': 'H2O Wave Integration with FinHubb!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e'
}

view raw

clsConfig.py

hosted with ❤ by GitHub


Let’s explore the directory structure –

MAC Directory

Let’s run the application –

Step 1:

Starting of Wave Server

Step 2:

Triggering message consumption job

Step 3:

Triggering the main application

You can monitor the message consumption from your Ably portal as follows –

Message Consumption

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

Message spike during consumption

And, the final output in the interactive dashboard will be look like the below screenshot –

Interactive Real-time Dashboard

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Combining the NoSQL(Cosmos DB) & traditional Azure RDBMS in Azure (Time stone solo from Python verse)

Hi Guys!

Today, our main objective is to extend our last post & blending two different kinds of data using Python.

Please refer the earlier post if you didn’t go through it – “Building Azure cosmos application.“.

What is the Objective?

In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.

Examining Source Data.

No SQL Data from Cosmos:

Let’s check one more time the No SQL data created in our last post.

CosmosData

Total, we’ve created 6 records in our last post.

As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.

Azure SQL DB:

Let’s create some data in Azure SQL DB.

But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.

I won’t discuss the detailed steps of creating DB here.

From Azure portal, it looks like –

Azure SQL DB Main Screen

Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.

Let’s create the table & some sample data aligned as per our cosmos data.

Azure SQL DB

We will join both the data based on subscriberId & then extract our required columns in our final output.

CombinedData

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py
  • clsColMgmt.py
  • clsCosmosDBDet.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC

Here is the detailed directory structure between the Windows & MAC O/S.

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Updated On: 02-Jun-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'SERVER': 'xxxx-xxx.database.windows.net',
        'DATABASE_1': 'SalesForceMaster',
        'DATABASE_2': 'hrMaster',
        'DATABASE_3': 'statMaster',
        'USERNAME': 'admin_poc_dev',
        'PASSWORD': 'xxxxx',
        'DRIVER': '{ODBC Driver 17 for SQL Server}',
        'ENV': 'pocdev-saty',
        'ENCRYPT_FLAG': "yes",
        'TRUST_FLAG': "no",
        'TIMEOUT_LIMIT': "30",
        'PROCSTAT': "'Y'",
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'CONFIG_TABLE': 'ETL_CONFIG_TAB',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'AZURE_SQL_1': "SELECT DISTINCT subscriberId, state, country, annualIncome, customerType FROM dbo.onboardCustomer",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

Here, we’ve added a couple of more entries compared to the last time, which points the detailed configuration for Azure SQL DB.

‘SERVER’: ‘xxxx-xxx.database.windows.net’,
‘DATABASE_1’: ‘SalesForceMaster’,
‘DATABASE_2’: ‘hrMaster’,
‘DATABASE_3’: ‘statMaster’,
‘USERNAME’: ‘admin_poc_dev’,
‘PASSWORD’: ‘xxxxx’,
‘DRIVER’: ‘{ODBC Driver 17 for SQL Server}’,
‘ENV’: ‘pocdev-saty’,
‘ENCRYPT_FLAG’: “yes”,
‘TRUST_FLAG’: “no”,
‘TIMEOUT_LIMIT’: “30”,
‘PROCSTAT’: “‘Y'”, 

Here, you need to supply your DB credentials accordingly.

2. clsDBLookup.py (This script will look into the Azure SQL DB & fetch data from the traditional RDBMS of Azure environment.)

#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 25-May-2019                     ####
####                                             ####
#### Objective: This script will check &         ####
#### test the connection with the Azure          ####
#### SQL DB & it will fetch all the records      ####
#### name resied under the same DB of a table.   ####
#####################################################

import pyodbc as py
import pandas as p
from clsConfig import clsConfig as cdc

class clsDBLookup(object):
    def __init__(self, lkpTableName = ''):
        self.server = cdc.config['SERVER']
        self.database = cdc.config['DATABASE_1']
        self.database1 = cdc.config['DATABASE_2']
        self.database2 = cdc.config['DATABASE_3']
        self.username = cdc.config['USERNAME']
        self.password = cdc.config['PASSWORD']
        self.driver = cdc.config['DRIVER']
        self.env = cdc.config['ENV']
        self.encrypt_flg = cdc.config['ENCRYPT_FLAG']
        self.trust_flg = cdc.config['TRUST_FLAG']
        self.timeout_limit = cdc.config['TIMEOUT_LIMIT']
        self.lkpTableName = cdc.config['CONFIG_TABLE']
        self.ProcStat = cdc.config['PROCSTAT']
        self.AppId = cdc.config['APP_ID']

    def LookUpData(self):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            db_con_azure = py.connect(str_conn)

            query = " SELECT [ruleId] as ruleId, [ruleName] as ruleName, [ruleSQL] as ruleSQL, " \
                    " [ruleFlag] as ruleFlag, [appId] as appId, [DBType] as DBType, " \
                    " [DBName] as DBName FROM [dbo][" + lkpTableName + "] WHERE ruleFLag = " + ProcStat + " " \
                    " and appId = " + AppId + " ORDER BY ruleId "

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

    def azure_sqldb_read(self, sql):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            # print("Connection Details:: ", str_conn)
            db_con_azure = py.connect(str_conn)

            query = sql

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

Major lines to discuss –

azure_sqldb_read(self, sql):

Getting the source SQL supplied from the configuration script.

db_con_azure = py.connect(str_conn)

query = sql

df = p.read_sql(query, db_con_azure)

After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.

3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Modified On 02-Jun-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsLog()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Accessing from Azure SQL DB
        x1 = dbcon.clsDBLookup()
        act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

        print("Azure SQL DB::")
        print(act_df)
        print()

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

        # Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
        df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

        df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

        print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
        print(df_fin)

        l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')

        # Transforming the orderDate as per standard format
        df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

        # Dropping the old column & renaming the new column to old column
        df_fin.drop(columns=['orderDate'], inplace=True)
        df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

        print("*" * 157)
        print()
        print("Final Combined & Transformed result:: ")
        print(df_fin)

        l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
        print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The key lines from this script –

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

This function converts NoSQL date data type more familiar format.

NoSQL Date:
NoSQL_Date
Transformed Date:
Transformed Date
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

print("Azure SQL DB::")
print(act_df)
print()

Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.

# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.

# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.

Let’s see the directory structure of our program –

Win_Vs_MAC

Let’s see how it looks when it runs –

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

So, finally, we’ve successfully blended the data & make more meaningful data projection.

Following python packages are required to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

pip install pyodbc

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]

Building Azure Cosmos solution using Python, Pandas ( A crossover of space stone, a reality stone, soul stone & time stone)

Hi Guys,

Here is the latest installment from the Python verse. For the first time, we’ll be dealing with Python with Azure cloud along with the help from Pandas & json.

Why post on this topic?

I always try to post something based on some kind of used cases, which might be useful in real-life scenarios. And, on top of that, I really don’t find significant posts on Azure dealing with Python. So, thought of sharing some first used cases, which will encourage others to join this club & used more python based application in the Azure platform.

First, let us check the complexity of today’s post & our objective.

What is the objective?

Today, our objective is to load a couple of json payload & stored them into multiple Cosmos Containers & finally fetch the data from the Cosmos DB & store the output into our log files apart from printing the same over the terminal screen.

Before we start discussing our post, let us explain some basic terminology of Azure Cosmos DB. So, that, next time whenever we refer them, it will be easier for you to understand those terminologies.

Learning basic azure terminology.

Since this is an unstructured DB, all the data will be stored in this following fashion –

Azure Cosmos DB -> Container -> Items

Let’s simplify this in words. So, each azure DB may have multiple containers, which you can compare with the table of any conventional RDBMS. And, under containers, you will have multiple items, which represents rows of an RDBMS table. The only difference is in each item you might have a different number of elements, which is equivalent to the columns in traditional RDBMS tables. The traditional table always has a fixed number of columns.

Input Payload:

Let’s review three different payloads, which we’ll be loading into three separate containers.

srcEmail.json
srcEmail_json

As you can see in the items, first sub-row has 3 elements, whereas the second one has 4 components. Traditional RDBMS, the table will always have the same number of columns.

srcTwitter.json
srcTwitter_json
srcHR.json
srcHR_json

So, from the above three sample payload, our application will try to put user’s feedback & consolidate at a single place for better product forecasts.

Azure Portal:

Let’s look into the Azure portal & we’ll be identifying a couple of crucial information, which will require in python scripts for authentication. But, before that, I’ll show – how to get those details in steps –

Azure_portal_home

As shown highlighted in Red, click the Azure Cosmos DB. You will find the following screen –

Azure_portal_1

If you click this, you will find all the collections/containers that are part of the same DB as follows –

Azure_portal_2

After, that we’ll be trying to extract the COSMOS Key & the Endpoint/URI from the portal. Without this, python application won’t be able to interact with the Azure portal. This is sensitive information. So, I’ll be providing some dummy details here just to show how to extract it. Never share these details with anyone outside of your project or group.

Cosmos_Keys

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC_Dir

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################
import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

2. clsCosmosDBDet (This script will test the necessary connection with the Azure cosmos DB from the python application. And, if it is successful, then it will fetch all the collection/containers details, which resided under the same DB. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
####                                      ####
#### Objective: This script will check &  ####
#### test the connection with the Cosmos  ####
#### & it will fetch all the collection   ####
#### name resied under the same DB.       ####
##############################################

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors

from clsConfig import clsConfig as cf

class IDisposable(cosmos_client.CosmosClient):
    def __init__(self, obj):
        self.obj = obj

    def __enter__(self):
        return self.obj

    def __exit__(self, exception_type, exception_val, trace):
        self = None

class clsCosmosDBDet:
    def __init__(self):
        self.endpoint = cf.config['COSMOSDB_ENDPOINT']
        self.primarykey = cf.config['COSMOS_PRIMARYKEY']
        self.db = cf.config['COSMOSDB']
        self.cont_1 = cf.config['COSMOS_CONTAINER1']
        self.cont_2 = cf.config['COSMOS_CONTAINER2']
        self.cont_3 = cf.config['COSMOS_CONTAINER3']
        self.database_link = cf.config['database_link']
        self.collection_link_1 = cf.config['collection_link_1']
        self.collection_link_2 = cf.config['collection_link_2']
        self.collection_link_3 = cf.config['collection_link_3']
        self.options = cf.config['options']
        self.db_qry = cf.config['DB_QRY']
        self.collection_qry = cf.config['COLLECTION_QRY']

    def list_Containers(self, client):
        try:
            database_link = self.database_link
            collection_qry = self.collection_qry
            print("1. Query for collection!")
            print()

            collections = list(client.QueryContainers(database_link, {"query": collection_qry}))

            if not collections:
                return

            for collection in collections:
                print(collection['id'])

            print()

        except errors.HTTPFailure as e:
            if e.status_code == 404:
                print("*" * 157)
                print('A collection with id \'{0}\' does not exist'.format(id))
                print("*" * 157)
            else:
                raise errors.HTTPFailure(e.status_code)

    def test_db_con(self):
        endpoint = self.endpoint
        primarykey = self.primarykey
        options_1 = self.options
        db_qry = self.db_qry

        with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
            try:
                try:
                    options = {}
                    query = {"query": db_qry}
                    options = options_1

                    print("-" * 157)
                    print('Options:: ', options)
                    print()
                    print("Database details:: ")

                    result_iterable = client.QueryDatabases(query, options)

                    for item in iter(result_iterable):
                        print(item)

                    print("-" * 157)

                except errors.HTTPFailure as e:
                    if e.status_code == 409:
                        pass
                    else:
                        raise errors.HTTPFailure(e.status_code)

                self.list_Containers(client)

                return 0

            except errors.HTTPFailure as e:
                print("Application has caught an error. {0}".format(e.message))

                return 1

            finally:
                print("Application successfully completed!")

Key lines from the above scripts are –

with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:

In this step, the python application is building the connection object.

# Refer the entry in our config file
self.db_qry = cf.config['DB_QRY']
..
query = {"query": db_qry}
options = options_1
..
result_iterable = client.QueryDatabases(query, options)

Based on the supplied value from our configuration python script, this will extract the cosmos DB information.

self.list_Containers(client)

This is a function that will identify all the collection under this DB.

def list_Containers(self, client):
..
collections = list(client.QueryContainers(database_link, {"query": collection_qry}))

if not collections:
 return

for collection in collections:
 print(collection['id'])

In these above lines, our application will actually fetch the containers that are associated with this DB.

3. clsColMgmt.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

################################################
#### Written By: SATYAKI DE                 ####
#### Written On: 25-May-2019                ####
####                                        ####
#### Objective: This scripts has multiple   ####
#### features. You can create new items     ####
#### in azure cosmos db. Apart from that    ####
#### you can retrieve data from Cosmos just ####
#### for viewing purpose. You can display   ####
#### data based on specific filters or the  ####
#### entire dataset. Hence, three different ####
#### methods provided here to support this. ####
################################################

import azure.cosmos.cosmos_client as cosmos_client
import azure.cosmos.errors as errors
import pandas as p
import json

from clsConfig import clsConfig as cf

class IDisposable(cosmos_client.CosmosClient):
    def __init__(self, obj):
        self.obj = obj

    def __enter__(self):
        return self.obj

    def __exit__(self, exception_type, exception_val, trace):
        self = None

class clsColMgmt:
    def __init__(self):
        self.endpoint = cf.config['COSMOSDB_ENDPOINT']
        self.primarykey = cf.config['COSMOS_PRIMARYKEY']
        self.db = cf.config['COSMOSDB']
        self.cont_1 = cf.config['COSMOS_CONTAINER1']
        self.cont_2 = cf.config['COSMOS_CONTAINER2']
        self.cont_3 = cf.config['COSMOS_CONTAINER3']
        self.database_link = cf.config['database_link']
        self.collection_link_1 = cf.config['collection_link_1']
        self.collection_link_2 = cf.config['collection_link_2']
        self.collection_link_3 = cf.config['collection_link_3']
        self.options = cf.config['options']
        self.db_qry = cf.config['DB_QRY']
        self.collection_qry = cf.config['COLLECTION_QRY']

    # Creating cosmos items in container
    def CreateDocuments(self, inputJson, collection_flg = 1):
        try:
            # Declaring variable
            endpoint = self.endpoint
            primarykey = self.primarykey

            print('Creating Documents')

            with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
                try:
                    if collection_flg == 1:
                        collection_link = self.collection_link_1
                    elif collection_flg == 2:
                        collection_link = self.collection_link_2
                    else:
                        collection_link = self.collection_link_3

                    container = client.ReadContainer(collection_link)

                    # Create a SalesOrder object. This object has nested properties and various types including numbers, DateTimes and strings.
                    # This can be saved as JSON as is without converting into rows/columns.
                    print('Input Json:: ', str(inputJson))
                    nSon = json.dumps(inputJson)
                    json_rec = json.loads(nSon)

                    client.CreateItem(container['_self'], json_rec)

                except errors.HTTPFailure as e:
                    print("Application has caught an error. {0}".format(e.status_code))

                finally:
                    print("Application successfully completed!")

            return 0
        except Exception as e:
            x = str(e)
            print(x)
            return 1

    def CosmosDBCustomQuery_PandasCSVWithParam(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
        try:
            # Reading data by SQL & convert it ot Pandas Dataframe
            results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
            cnt = 0

            dfSrc = p.DataFrame()
            dfRes = p.DataFrame()
            dfSrc2 = p.DataFrame()
            json_data = ''

            for doc in results:
                cnt += 1

            dfSrc = p.io.json.json_normalize(results)
            dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
            dfRes = dfSrc

            print("Total records fetched: ", cnt)
            print("*" * 157)

            return dfRes
        except errors.HTTPFailure as e:
            Df_Fin = p.DataFrame()
            if e.status_code == 404:
                print("*" *157)
                print("Document doesn't exists")
                print("*" *157)
                return Df_Fin
            elif e.status_code == 400:
                print("*" * 157)
                print("Bad request exception occuered: ", e)
                print("*" *157)
                return Df_Fin
            else:
                return Df_Fin
        finally:
            print()

    def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
        try:
            # Reading data by SQL & convert it ot Pandas Dataframe
            results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
            cnt = 0

            dfSrc = p.DataFrame()
            dfRes = p.DataFrame()
            dfSrc2 = p.DataFrame()
            json_data = ''

            for doc in results:
                cnt += 1

            dfSrc = p.io.json.json_normalize(results)
            dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
            dfRes = dfSrc

            print("Total records fetched: ", cnt)
            print("*" * 157)

            return dfRes
        except errors.HTTPFailure as e:
            Df_Fin = p.DataFrame()
            if e.status_code == 404:
                print("*" *157)
                print("Document doesn't exists")
                print("*" *157)
                return Df_Fin
            elif e.status_code == 400:
                print("*" * 157)
                print("Bad request exception occuered: ", e)
                print("*" *157)
                return Df_Fin
            else:
                return Df_Fin
        finally:
            print()

    def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]):
        endpoint = self.endpoint
        primarykey = self.primarykey
        options_1 = self.options

        with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
            try:
                if collection_flg == 1:
                    collection_link = self.collection_link_1
                elif collection_flg == 2:
                    collection_link = self.collection_link_2
                else:
                    collection_link = self.collection_link_3

                print("Additional parameters: ", additional_params)

                message = msg
                options = options_1

                if additional_params == 1:
                    query = {"query": sql_qry}
                    df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options)
                else:
                    query = {"query": sql_qry, "parameters": param_det}
                    df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options)

                return df_Fin
            except errors.HTTPFailure as e:
                print("Application has caught an error. {0}".format(e.message))

            finally:
                print("Application successfully completed!")

Key lines from the above script –

def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):

This method is generic. It will fetch all the records of a cosmos container.

results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql))
..
for doc in results:
cnt += 1

dfSrc = p.io.json.json_normalize(results)
dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1])
dfRes = dfSrc

In this step, the application fetching the data in the form of json & then serialize them & flatten them & finally stored the result into pandas dataframe for return output. Function –

CosmosDBCustomQuery_PandasCSVWithParam

– Is the same as the previous function. The only thing it can process parameters to filter out the data.

def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]):

This is the primary calling function. Let us find out the key lines –

if collection_flg == 1:
    collection_link = self.collection_link_1
elif collection_flg == 2:
    collection_link = self.collection_link_2
else:
    collection_link = self.collection_link_3

Based on the supplied collection_flag from the main scripts, our application is identifying the collection where we need to process/load our data.

if additional_params == 1:
    query = {"query": sql_qry}
    df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options)
else:
    query = {"query": sql_qry, "parameters": param_det}
    df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options)

Based on the supplied additiona_params value, python application process, the filter queries & based on that it will invoke the function.

def CreateDocuments(self, inputJson, collection_flg = 1):

This is the primary collection for creating items/rows.

if collection_flg == 1:
    collection_link = self.collection_link_1
elif collection_flg == 2:
    collection_link = self.collection_link_2
else:
    collection_link = self.collection_link_3

container = client.ReadContainer(collection_link)

Based on the collection, our application will points to a specific container & create a connection between python & itself.

nSon = json.dumps(inputJson)
json_rec = json.loads(nSon)

client.CreateItem(container['_self'], json_rec)

Once, you’ll receive the input payload. The application will convert it to valid JSON payload & then send it to create item method to insert records.

4. callCosmosAPI.py (This script is the main calling function. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB


def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsL()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Creating Data in Cosmos DB
        print()
        print('Fetching data from Json!')
        print('Creating data for Email..')
        print("-" * 157)

        emailFile = cf.config['EMAIL_SRC_JSON_FILE']
        flg = 1

        with open(emailFile) as json_file:
            dataEmail = json.load(json_file)

        # Creating documents
        a1 = cm.clsColMgmt()
        ret_cr_val1 = a1.CreateDocuments(dataEmail, flg)

        if ret_cr_val1 == 0:
            print('Successful data creation!')
        else:
            print('Failed create data!')

        print("-" * 157)

        print()
        print('Creating data for Twitter..')
        print("-" * 157)

        twitFile = cf.config['TWITTER_SRC_JSON_FILE']
        flg = 2

        with open(twitFile) as json_file:
            dataTwitter = json.load(json_file)

        # Creating documents
        a2 = cm.clsColMgmt()
        ret_cr_val2 = a2.CreateDocuments(dataTwitter, flg)

        if ret_cr_val2 == 0:
            print('Successful data creation!')
        else:
            print('Failed create data!')

        print("-" * 157)

        print()
        print('Creating data for HR..')
        print("-" * 157)

        hrFile = cf.config['HR_SRC_JSON_FILE']
        flg = 3

        with open(hrFile) as json_file:
            hrTwitter = json.load(json_file)

        # Creating documents
        a3 = cm.clsColMgmt()
        ret_cr_val3 = a3.CreateDocuments(hrTwitter, flg)

        if ret_cr_val3 == 0:
            print('Successful data creation!')
        else:
            print('Failed create data!')

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key lines from the above script –

with open(twitFile) as json_file:
    dataTwitter = json.load(json_file)

Reading a json file.

val = 'crazyGo'
param_det = [{"name": "@CrVal", "value": val}]
add_param = 2

Passing a specific parameter value to filter out the record, while fetching it from the Cosmos DB.

Now, let’s look at the runtime stats.

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

Let’s compare the output log directory –

Windows:

Win_Log_Dir

MAC:

MAC_Log_Dir

Let’s verify the data from Cosmos DB.

Sample_Cosmos_Qry_Output_1

Here, subscriberId starting with ‘M‘ denotes data inserted from the MAC environment. Other one inserted through Windows.

Let’s see one more example from Cosmos –

Sample_Cosmos_Qry_Output_2

So, I guess – we’ve achieved our final goal here. Successfully, inserted data into Azure Cosmos DB from the python application & retrieve it successfully.

Following python packages are required in order to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]

Pandas & Numpy (Space Stone of Programming World)

Today, we’ll demonstrate the different application of Pandas. In this case, we’ll be exploring the possibilities of reading large CSV files & splitting it sets of smaller more manageable csv to read.

And, after creating it, another process will merge them together. This is especially very useful when you need transformation on a large volume of data without going for any kind of memory error. And, moreover, the developer has more control over failed cases & can resume the load without restarting it from the beginning of the files.

In this case, I’ll be using one more custom methods to create the csv file instead of directly using the to_csv method of pandas.

But, before that let’s prepare the virtual environment & proceed from there –

Windows 10 (64 bit): 

Commands:

python -m venv –copies .env

.env\Scripts\activate.bat

Screenshot:

windows_screen1

Mac OS (64 bit): 

Commands:

python -m venv env

source env/bin/activate

Screenshot:

mac_screen

So, both the Windows & Mac version is 3.7 & we’re going to explore our task in the given section.

After creating this virtual environment, you need to install only pandas package for this task as shown below for both the Windows or Mac OS –

Windows:

package_install_windows

Mac:

package_install_mac

Rests are the packages comes as default with the Python 3.7.

Please find the GUI screenshots from WinSCP software comparing both the directory structures (Mac & Windows) as given below –

winscp_screen

From the above screenshot, you can see that our directory structure are not exactly identical before the blog directory. However, our program will take care of this difference.

Let’s check the scripts one-by-one,

1. clsL.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#############################################
#### Written By: Satyaki De              ####
#############################################
import pandas as p
import os
import platform as pl

class clsL(object):
    def __init__(self):
        self.path = os.path.dirname(os.path.realpath(__file__))

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df

            sd = subdir
            os_det = pl.system()

            if os_det == "Windows":
                if sd == None:
                    fullFileName = self.path + "\\" + Filename
                else:
                    fullFileName = self.path + "\\" + sd + "\\" + Filename
            else:
                if sd == None:
                    fullFileName = self.path + "/" + Filename
                else:
                    fullFileName = self.path + "/" + sd + "/" + Filename


            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

From the above script, you can see that based on the Indicator, whose value can be either ‘Y’ or ‘N’. It will generate the csv file from the pandas data frame using to_csv method available in pandas.

Key snippet to notice –

self.path = os.path.dirname(os.path.realpath(__file__))

Here, the class is creating an instance & during that time it is initializing the value of the current path from where the application is triggering.

x = p.DataFrame()
x = df

The first line, declaring a pandas data frame variable. The second line assigns the value from the supplied method to that variable.

os_det = pl.system()

This will identify the operating system on which your application is running. Based on that, your path will be dynamically configured & passed. Hence, your application will be ready to handle multiple operating systems since beginning.

x.to_csv(fullFileName, index=False)

Finally, to_csv will generate the final csv file based on the supplied Indicator value. Also, notice that we’ve added one more parameter (index=False). By default, pandas create one extra column known as an index & maintain it’s operation based on that.

index_val

As you can see that the first column is not coming from our source files. Rather, it is generated by the pandas package in python. Hence, we don’t want to capture that in our final file by mentioning (index=False) options.

2. clsSplitFl.py (This script will create the split csv files. This will bring chunk by chunk data into your memory & process the large files.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#############################################
#### Written By: Satyaki De              ####
#############################################
import os
import pandas as p
import clsLog as log
import gc
import csv

class clsSplitFl(object):
    def __init__(self, srcFileName, path, subdir):
        self.srcFileName = srcFileName
        self.path = path
        self.subdir = subdir

        # Maximum Number of rows in CSV
        # in order to avoid Memory Error
        self.max_num_rows = 30000
        self.networked_directory = 'src_file'
        self.Ind = 'Y'

    def split_files(self):
        try:
            src_dir = self.path
            subdir = self.subdir
            networked_directory = self.networked_directory

            # Initiate Logging Instances
            clog = log.clsLog()

            # Setting up values
            srcFileName = self.srcFileName

            First_part, Last_part = str(srcFileName).split(".")

            num_rows = self.max_num_rows
            dest_path = self.path
            remote_src_path = src_dir + networked_directory
            Ind = self.Ind
            interval = num_rows

            # Changing work directory location to source file
            # directory at remote server
            os.chdir(remote_src_path)

            src_fil_itr_no = 1

            # Split logic here
            for df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):
                # Changing the target directory path
                os.chdir(dest_path)

                # Calling custom file generation method
                # to generate splitted files
                clog.logr(str(src_fil_itr_no) + '__' + First_part + '_' + '_splitted_.' + Last_part, Ind, df2, subdir)

                del [[df2]]
                gc.collect()

                src_fil_itr_no += 1

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

In this script, we’re splitting the file if that file has more than 30,000 records. And, based on that it will split a number of equal or fewer volume files.

Important lines to be noticed –

self.max_num_rows = 30000

As already explained, based on this the split files contain the maximum number of rows in each file.

First_part, Last_part = str(srcFileName).split(“.”)

This will split the source file name into the first part & second part i.e. one part contains only the file name & the other part contains only the extension dynamically.

for df2 in p.read_csv(srcFileName, index_col=False, error_bad_lines=False, chunksize=interval):

As you can see, the chunk-by-chunk (mentioned as chunksize=interval) application will read lines from the large source csv. And, if it has any bad rows in the source files – it will skip them due to the following condition -> (error_bad_lines=False).

clog.logr(str(src_fil_itr_no) + ‘__’ + First_part + ‘_’ + ‘_splitted_.’ + Last_part, Ind, df2, subdir)

Dynamically generating split files in the specific subdirectory along with the modified name. So, these files won’t get overwritten – if you rerun it. Remember that the src_fil_itr_no will play an important role while merging them back to one as this is a number representing the current file’s split number.

del [[df2]]
gc.collect()

Once, you process that part – delete the data frame & deallocate the memory. So, that you won’t encounter any memory error or a similar issue.

And, the split file will look like this –

split_file_in_windows

3. clsMergeFl.py (This script will add together all the split csv files into one big csv file. This will bring chunk by chunk data into your memory & generates the large file.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#############################################
#### Written By: Satyaki De              ####
#############################################
import os
import platform as pl
import pandas as p
import gc
import clsLog as log
import re

class clsMergeFl(object):

    def __init__(self, srcFilename):
        self.srcFilename = srcFilename
        self.subdir = 'finished'
        self.Ind = 'Y'

    def merge_file(self):
        try:
            # Initiating Logging Instances
            clog = log.clsLog()
            df_W = p.DataFrame()
            df_M = p.DataFrame()
            f = {}

            subdir = self.subdir
            srcFilename = self.srcFilename
            Ind = self.Ind
            cnt = 0

            os_det = pl.system()

            if os_det == "Windows":
                proc_dir = "\\temp\\"
                gen_dir = "\\process\\"
            else:
                proc_dir = "/temp/"
                gen_dir = "/process/"

            # Current Directory where application presents
            path = os.path.dirname(os.path.realpath(__file__)) + proc_dir

            print("Path: ", path)
            print("Source File Initial Name: ", srcFilename)

            for fname in os.listdir(path):
                if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'):
                    key = int(re.split('__', str(fname))[0])
                    f[key] = str(fname)

            for k in sorted(f):
                print(k)
                print(f[k])
                print("-"*30)

                df_W = p.read_csv(path+f[k], index_col=False)

                if cnt == 0:
                    df_M = df_W
                else:
                    d_frames = [df_M, df_W]
                    df_M = p.concat(d_frames)

                cnt += 1

                print("-"*30)
                print("Total Records in this Iteration: ", df_M.shape[0])

            FtgtFileName = fname.replace('_splitted_', '')
            first, FinalFileName = re.split("__", FtgtFileName)

            clog.logr(FinalFileName, Ind, df_M, gen_dir)

            del [[df_W], [df_M]]
            gc.collect()

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

In this script, we’re merging smaller files into a large file. Following are the key snippet that we’ll explore –

for fname in os.listdir(path):
    if fname.__contains__(srcFilename) and fname.endswith('_splitted_.csv'):
        key = int(re.split('__', str(fname))[0])
        f[key] = str(fname)

In this section, the application will check if in that specified path we’ve files whose extension ends with “_splitted_.csv” & their first name starts with the file name initial i.e. if you have a source file named – acct_addr_20180112.csv, then it will check the first name should start with the -> “acct_addr” & last part should contain “_splitted_.csv”. If it is available, then it will start the merge process by considering one by one file & merging them using pandas data frame (marked in purple color) as shown below –

for k in sorted(f):
    print(k)
    print(f[k])
    print("-"*30)

    df_W = p.read_csv(f[k], index_col=False)

    if cnt == 0:
        df_M = df_W
    else:
        d_frames = [df_M, df_W]
        df_M = p.concat(d_frames)

    cnt += 1

Note that, here f is a dictionary that contains filename in key, value pair. The first part of the split file contains the number.  That way, it would be easier for the merge to club them back to one large file without thinking of orders.

Here, also notice the special function concat provided by the pandas. In this step, applications are merging two data frames.

Finally, the main python script, from where we’ll call it –

4. callSplitMergeFl.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#############################################
#### Written By: Satyaki De              ####
#############################################
import clsSplitFl as t
import clsMergeFl as cm
import re
import platform as pl
import os

def main():
    print("Calling the custom Package for large file splitting..")
    os_det = pl.system()

    print("Running on :", os_det)

    ###############################################################
    ###### User Input based on Windows OS                  ########
    ###############################################################

    srcF = str(input("Please enter the file name with extension:"))
    base_name = re.sub(r'[0-9]','', srcF)
    srcFileInit = base_name[:-5]

    if os_det == "Windows":
        subdir = "\\temp\\"
        path = os.path.dirname(os.path.realpath(__file__)) + "\\"
    else:
        subdir = "/temp/"
        path = os.path.dirname(os.path.realpath(__file__)) + '/'

    ###############################################################
    ###### End Of User Input                                 ######
    ###############################################################

    x = t.clsSplitFl(srcF, path, subdir)

    ret_val = x.split_files()

    if ret_val == 0:
        print("Splitting Successful!")
    else:
        print("Splitting Failure!")

    print("-"*30)

    print("Finally, Merging small splitted files to make the same big file!")

    y = cm.clsMergeFl(srcFileInit)

    ret_val1 = y.merge_file()

    if ret_val1 == 0:
        print("Merge Successful!")
    else:
        print("Merge Failure!")

    print("-"*30)



if __name__ == "__main__":
    main()

Following are the key section that we can check –

import clsSplitFl as t
import clsMergeFl as cm

Like any other standard python package, we’re importing our own class into our main callable script.

x = t.clsSplitFl(srcF, path, subdir)
ret_val = x.split_files()

Or,
y = cm.clsMergeFl(srcFileInit)
ret_val1 = y.merge_file()

In this section, we’ve instantiated the class & then we’re calling its function. And, based on the return value – we’re printing the status of our application last run.

The final run of this application looks like ->

Windows:

final_run_windows

Mac:

final_run_mac

And, the final file should look like this –

Windows:

win_img1

MAC:

mac_img1

Left-hand side representing windows final processed/output file, whereas right-hand side representing MAC final processed/output file.

Hope, this will give you some idea about how we can use pandas in various cases apart from conventional data computing.

In this post, I skipped the exception part intentionally. I’ll post one bonus post once my series complete.

Let me know, what do you think.

Till then, Happy Avenging!

Satyaki De

Regular Expression on Teradata 14.0

I’ve been working for more than 8 years in Oracle 10g, 11g & worked significant queries on Regular expressions in various scenario using SQL. It is real handy if you know how to use it & can reduce lots of pain with single SQL. And, the performance will be better compared to the total effort to achieve the same functionalists by using multiple SQL queries or PL/SQL Procedures.

Last couple of years, I’m working on Teradata. And, on some occasion – I was expecting features like these, where I can easily manipulate data with regular expression. I’m pretty excited when I heard that Teradata also introduced Regular Expression from Version 14.0.


As a result, I tried all those features that I think can be handy & useful for various scenarios & followings are the successful queries that I get. There are two occasion, where Teradata partially able to manipulate those strings. I’ve checked the latest Teradata Manual. However, unable to find those solution. So, I’m expecting other forum members can contribute here in order to make this thread useful for every one of us. And, I’ll post here as soon as I get some answers on these partial conversions.

For better understanding, I’ve provided the actual column value & after transformation value of that column in the output. That will help us to grasp it easily – I guess. 🙂


Case 1,

1
2
3
4
5
SELECT regexp_replace('SatyakiDe','([[:lower:]]{1,})([[:upper:]]{1,})','\1 \2') AS COL_VAL;

COLA COL_VAL
---------------- ----------------------------------------
SatyakiDe Satyaki De


Case 2,

1
2
3
4
5
select regexp_replace('919047242526','^([[:digit:]]{2})([[:digit:]]{10})','+\1 \2') COL_VAL;

COLA COL_VAL
------------ ---------------
919047255555 +91 9047255555



Case 3,

1
2
3
4
5
select regexp_replace('+++C','^([[:punct:]]{2})([[:punct:]]{1})(.*)$','\1\3') COL_VAL;

COLA COL_VAL
---- -----
+++C ++C



Case 4,

1
2
3
4
5
select initcap(regexp_replace(regexp_substr(' satyaki.de@mail.com','[^@]+'),'(.*)(\.)(.*)','\1 \3')) COL_VAL;

COLA COL_VAL
-------------------------------- --------------------------------------------------
satyaki.de@mail.com Satyaki De



Case 5,

1
2
3
4
5
select regexp_replace('100011001','([[:digit:]]{3})([[:digit:]]{2})([[:digit:]]{4})','XXX-XX-\3') as COL_VAL;

COLA COL_VAL
---------------- --------------------
100011001 XXX-XX-1001



Case 6,

1
2
3
4
5
select regexp_replace('123456789','([[:digit:]]{3})([[:digit:]]{3})([[:digit:]]{3})','\3.\2.\1') as COL_VAL;

COLA COL_VAL
--------- ---------------
123456789 789.456.123



Case 7,

1
2
3
4
5
SELECT regexp_replace('satyaki9de0loves3to8work2on2sql0and2bi6tools1','[^0-9]+','',1,0,'i') AS DER_VAL;

COLA DER_VAL
--------------------------------------------- ----------
satyaki1de0loves3to8work2on2sql0and2bi4tools1 1038220241




As you can see, all the characters have filtered out from the string & only numbers are kept here. These sorts of queries are very useful in lots of different business scenarios as well.

So, any extra space may not produce desired result. And, needs to pay attention into these small details. 

And, I’ve tested all these queries in the following two versions –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.00.02
2 RELEASE 14.10.00.02
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard



Hope, this will give you much more clarity. 🙂

One more thing, I would like to clarify here – my intention is to describe more features about these regexp_(similar/substr/instr/replace) functions.

I’ve received one question whether these regexp functions available in TD 13 or not in Teradata forum while posting the same article over there.

And, here is my answer to that question –  

Regarding version 13,

Let us check whether they have these regexp functions or not –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.00.00.15
2 RELEASE 13.00.00.15
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.10.07.12
2 RELEASE 13.10.07.12
3 LANGUAGE SUPPORT MODE Standard


1
2
3
4
5
6
7
8
9
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;
$
*** Failure 3706 Syntax error: expected something between '(' and the string 'S' keyword.
Statement# 1, Info =35
*** Total elapsed time was 1 second.



Hope this will give adequate clarity to the answer of that above question.

Now, Lets see some other functionality.

REGEXP_SIMILAR has similar functionality like REGEXP_LIKE in Oracle.

Let’s see couple of such cases –

Lets prepare the table with some dummy data –


 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SELECT * FROM dbc.dbcinfo;

InfoKey InfoData
-------- -----------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard


CREATE MULTISET VOLATILE TABLE TEST_T1
(
COL1 VARCHAR(10)
)
ON COMMIT
PRESERVE ROWS;

INSERT INTO TEST_T1 VALUES('456')
;INSERT INTO TEST_T1 VALUES('123x')
;INSERT INTO TEST_T1 VALUES('x123')
;INSERT INTO TEST_T1 VALUES('y')
;INSERT INTO TEST_T1 VALUES('+789')
;INSERT INTO TEST_T1 VALUES('-789')
;INSERT INTO TEST_T1 VALUES('159-')
;INSERT INTO TEST_T1 VALUES('-1-');


Lets check the data now –

 1
2
3
4
5
6
7
8
9
10
11
12
SELECT *
FROM TEST_T1;

COL1
1 123x
2 456
3 x123
4 +789
5 -789
6 y
7 159-
8 -1-



Let’s look into the various scenarios now –


Case 1 (Returns Mixed Numbers, Signed Numbers & Non Numbers),

 1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=0;

COL1
-----
1 123x
2 x123
3 +789
4 -789
5 y
6 159-
7 -1-




Case 2 (Returns Only Unsigned Positive Numbers),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=1;

COL1
-----
456



Case 3 (Returns All Numbers including Positive, Negative & unsigned),

 1
2
3
4
5
6
7
8
9
10
11
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[+-]?[0-9]+[+-]?$','c')=1;

COL1
-----
456
+789
-789
159-
-1-



Case 4 (Returns Only Non Numbers i.e. Characters),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'[^0-9]+','c')=1;

COL1
----
y



Hope this will give you some additional idea. 🙂

My objective is to provide basic information to my friends. So, that they can write better SQL in TD while migrating from other popular databases or new developer in TD can get a flavor of this powerful feature & exploit them in all the positive aspect & apply them properly. 😀

Really appreciate your time to read this post.

Regards.

Satyaki De.