Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = 1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = 1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
def create_dashboard(update_freq=0.0):
page = site['/dashboard_finnhub']
general_log_path = str(cf.config['LOG_PATH'])
ably_id = str(cf.config['ABLY_ID'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO)
os_det = pl.system()
if os_det == "Windows":
src_path = path + '\\' + 'data\\'
else:
src_path = path + '/' + 'data/'
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
# Writing to the file
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir)
# Dropping unwanted columns
df_conv.drop(columns=['c'], axis=1, inplace=True)
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
# New Column List Orders
column_order = ['s', 'default_rank', 'p', 't', 'v']
df_unique_fin = df_unique.reindex(column_order, axis=1)
print('Rank DF Unique:')
print(df_unique_fin)
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir)
# Capturing transformed values into a DataFrame
# Depending on your logic, you'll implement that inside
# the process_DF functions
fin_df = process_DF(df_conv, df_unique_fin)
df_unq_fin = df_unique_fin.copy()
df_unq_fin.rename(columns={'s':'Company'}, inplace=True)
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True)
df_unq_fin.rename(columns={'v':'Change'}, inplace=True)
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True)
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir)
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True)
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir)
# Final clearance for better understanding of data
fin_df.drop(columns=['t'], axis=1, inplace=True)
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir)
count_row = df_unq_finale.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', 15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[1] = [cat, val]
page.save()
if __name__ == "__main__":
try:
# Main Calling script
create_dashboard(update_freq=0.25)
except Exception as e:
x = str(e)
print(x)

Let’s explore the key snippets from the above script –

def process_DF(inputDF, inputDFUnq):
    try:
        # Core Business logic
        # The application will show default value to any
        # trade-in stock in case that data doesn't consume
        # from the source.
        
        # Getting block count
        #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
        #l.logr('3. block_df.csv', 'Y', df_conv, subdir)

        # Getting block count
        #df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
        #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
        #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
        #df_conv_fin = df_conv.value_counts(['default_rank'])
        df_conv = inputDF
        df_unique_fin = inputDFUnq

        df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
        l.logr('3. max_df.csv', 'Y', df_conv, subdir)


        # Sorting the output
        sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)

        # New Column List Orders
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        df_fin = sorted_df.reindex(column_order, axis=1)

        l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)

        # Now splitting the sorted df into two sets
        lkp_max_count = 4
        df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]

        l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)

        df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
        l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)

        # Now to perform cross join, we will create
        # a key column in both the DataFrames to
        # merge on that key.
        df_unique_fin['key'] = 1
        df_fin_req['key'] = 1

        # Dropping unwanted columns
        df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
        l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)

        # Padding with dummy key values
        #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
        merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)

        l.logr('8. merge_df.csv', 'Y', merge_df, subdir)

        # Sorting the output
        sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)

        l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)

        # Calling new derived logic
        sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
        sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)

        l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Dropping unwanted columns
        sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)

        #Renaming the columns
        sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
        sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)

        l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Aligning columns
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)

        l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)

        # Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
        frames = [df_fin_na, merge_fin_df]
        fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])

        l.logr('13. fin_df.csv', 'Y', fin_df, subdir)

        # Final clearance & organization
        fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)

        l.logr('14. Final.csv', 'Y', fin_df, subdir)

        # Adjusting key columns
        fin_df.rename(columns={'s':'Company'}, inplace=True)
        fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
        fin_df.rename(columns={'v':'Change'}, inplace=True)

        l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)

        return fin_df
    except Exception as e:
        print('$' * 120)

        x = str(e)
        print(x)

        print('$' * 120)

        df = p.DataFrame()

        return df

The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –

a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1

Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.

def calc_p(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_p_val = float(row['p_y'])
        else:
            calc_p_val = float(row['p_x'])

        return calc_p_val
    except:
        return 0.0

def calc_v(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_v_val = float(row['v_y'])
        else:
            calc_v_val = float(row['v_x'])

        return calc_v_val
    except:
        return 0.0

The above snippet will capture the default values for those missing records.

    client = AblyRest(ably_id)
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

In the above snippet, the application will consume the streaming data from the Ably queue.

for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.

    # Converting dataframe to a desired Series
    f = CategoricalSeries(fin_df)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unq_finale.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Company'])

        ####################################################
        ##### Debug Purpose                        #########
        ####################################################
        print('Company: ', x_currency)
        print('J: ', str(j))
        print('Cat: ', cat)
        ####################################################
        #####   End Of Debug                         #######
        ####################################################

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            print('Update Cat: ', cat)
            print('Update Val: ', val)
            print('Update pc: ', pc)
            print('*' * 160)

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.


2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
# generate random floating point values
from random import seed
from random import random
# seed random number generator
import websocket
import json
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.config['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
def on_message(ws, message):
print("*" * 60)
res = json.loads(message)
jsBody = res["data"]
jdata_dyn = json.dumps(jsBody)
print(jdata_dyn)
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{
"c": "null",
"p": 0.01,
"s": "AAPL",
"t": 1624715406407,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "AMZN",
"t": 1624715406408,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "BINANCE:BTCUSDT",
"t": 1624715406409,
"v": 0.01
},
{
"c": "null",
"p": 0.01,
"s": "IC MARKETS:1",
"t": 1624715406410,
"v": 0.01
}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
jsBody = []
jdata_dyn = ''
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
# Invoking Individual Company Trade Queries
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()

The key snippet from the above script –

    json_data = [{
        "c": "null",
        "p": 0.01,
        "s": "AAPL",
        "t": 1624715406407,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "AMZN",
        "t": 1624715406408,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "BINANCE:BTCUSDT",
        "t": 1624715406409,
        "v": 0.01
    },
        {
        "c": "null",
        "p": 0.01,
        "s": "IC MARKETS:1",
        "t": 1624715406410,
        "v": 0.01
        }]

As we already discussed, we’ll pass a default set of data for all the candidate companies.

    # Publish a message to the sd_channel channel
    channel.publish('event', jdata)

    # Publish rest of the messages to the sd_channel channel
    channel.publish('event', jdata_dyn)

Publish the messages to the created channel.

def on_open(ws):
    # Invoking Individual Company Trade Queries
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

Send the company-specific trade queries through websocket apps to submit that to FinnHub.

3. clsConfig.py ( This file contains the configuration details. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning. Application will ####
#### process these information & perform ####
#### various analysis on Linear-Regression. ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'Data' + sep,
'APP_DESC_1': 'H2O Wave Integration with FinHubb!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e'
}

view raw

clsConfig.py

hosted with ❤ by GitHub


Let’s explore the directory structure –

MAC Directory

Let’s run the application –

Step 1:

Starting of Wave Server

Step 2:

Triggering message consumption job

Step 3:

Triggering the main application

You can monitor the message consumption from your Ably portal as follows –

Message Consumption

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

Message spike during consumption

And, the final output in the interactive dashboard will be look like the below screenshot –

Interactive Real-time Dashboard

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Creating a mock API using Mulesoft RAML & testing it using Python

Hi Guys,

Today, I’ll be using a popular tool known as Mulesoft to generate a mock API & then we’ll be testing the same using python. Mulesoft is an excellent tool to rapidly develop API & also can integrate multiple cloud environments as an Integration platform. You can use their Anypoint platform to quickly design such APIs for your organization. You can find the details in the following link. However, considering the cost, many organization has to devise their own product or tool to do the same. That’s where developing a Python or Node.js or C# comes adequately considering the cloud platform.

Before we start, let us quickly know what Mock API is?

A mock API server imitates a real API server by providing realistic responses to requests. They can be on your local machine or the public Internet. Responses can be static or dynamic, and simulate the data the real API would return, matching the schema with data types, objects, and arrays.

And why do we need that?

A mock API server is useful during development and testing when live data is either unavailable or unreliable. While designing an API, you can use mock APIs to work concurrently on the front and back-end, as well as to gather feedback from developers. Our mock API sever guide for testing covers how you can use a mock API server so the absence of a real API doesn’t hold you back.

Often with internal projects, the API consumer (such as a front end developer through REST APIs) moves faster than the backend team building the API. This API mocking guide shows how a mock API server allows developers to consume a working API with the same interface as the eventual production API. As an added benefit, the backend team can discover where the mock API doesn’t meet the developer’s needs without spending developer time on features that may be removed or changed. This fast feedback loop can make engineering teams much more efficient.

If you need more information on this topic, you can refer to the following link.

Great! Since now we have a background of mock API – let’s explore how Mulesoft can help us here?

Mulesoft used the “RESTful API Modeling Language (RAML)” language. We’ll be using this language to develop our mock API. To know more about this, you can view the following link.

Under the developer section, you can find Tutorials as shown in the screenshot given below –

18. Type Of RAML

You can select any of the categories & learn basic scripting from it.

Now, let’s take a look at the process of creating a Mulesoft free account to test our theories.

Step 1:

Click the following link, and you will see the page as shown below –

0.1. Mulesoft Landing Page

Step 2:

Now, click the login shown in the RED square. You will see the following page –

0.2. Mulesoft Sign-Up Option

Step 3:

Please provide your credentials if you already have an account. Else, you have to click the “Sign-Up” & then you will need to provide the few details as shown below –

1. Mulesoft Registration

Step 4:

Once, you successfully create the account, you will see the following page –

2. Mulesoft Interface

So, now we are set. To design an API, you will need to click the design center as marked within the white square.

Once you click the “Start designing” button, this will land into the next screen.

21. Creating a Projects

As shown above, you need to click the “Create new” for fresh API design.

This will prompt you the next screen –

22. Creating a Projects - Continue

Now, you need to create the – “Create API specification” as marked in the RED square box. And, that will prompt you the following screen –

23. Creating a Projects - Continue

You have to provide a meaningful name of our API & you can choose either Text or Visual editor. For this task, we’ll be selecting the Text Editor. And we’ll select RAML 1.0 as our preferred language. Once, we provide all the relevant information, the “Create Specification” button marked in Green will be activated. And then you need to click it. It will lead you to the next screen –

24. CodeSpace

Since we’ll be preparing this for mock API, we need to activate that by clicking the toggle button marked in the GREEN square box on the top-right side. And, this will generate an automated baseUri script as shown below –

25. CodeSpace - Continue

Now, we’re ready to develop our RAML code for the mock API. Let’s look into the RAML code.

1. phonevalisd.raml (This is the mock API script, which will send the response of an API request by returning a mock JSON if successful conditions met.)

#%RAML 1.0
# Created By - Satyaki De
# Date: 01-Mar-2020
# Description: This is an Mock API

baseUri: https://anypoint.mulesoft.com/mocking/api/v1/links/09KK0pos-1080-4049-9e04-a093456a64a8/ # 
title: PhoneVSD
securitySchemes:
  basic :
    type: Basic Authentication
    displayName: Satyaki's Basic Authentication
    description: API Only works with the basic authentication
protocols:
  - HTTP
description: This is a REST API Json base service to verify any phone numbers.
documentation:
  - title: PHONE VERIFY API
    content: This is a Mock API, which will simulate the activity of a Phone Validation API.
types:
  apiresponse:
    properties:
      valid: boolean
      number: string
      local_format: string
      international_format: string
      country_prefix: string
      country_code: string
      country_name: string
      location: string
      carrier: string
      line_type: string

/validate:
  get:
    queryParameters:
      access_key: string
      number: string
      country_code: string
      format: string
    description: For Validating the phone
    displayName: Validate phone
    protocols:
      - HTTP
    responses:
      403:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "Resource does not exists!"
              }
      400:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "API Key is invalid!"
              }
      200:
        body:
          application/json:
            type: apiresponse
            example:
              {
                "valid":true,
                "number":"17579758240",
                "local_format":"7579758240",
                "international_format":"+17579758240",
                "country_prefix":"+1",
                "country_code":"US",
                "country_name":"United States of America",
                "location":"Nwptnwszn1",
                "carrier":"MetroPCS Communications Inc.",
                "line_type":"mobile"
              }

Let’s quickly explore the critical snippet from the above script.

baseUri: https://anypoint.mulesoft.com/mocking/api/v1/links/86a5097f-1080-4049-9e04-a429219a64a8/ #

The above line will be our main URL when we’re planning to invoke that from Python script.

securitySchemes:
    basic :
        type: Basic Authentication

In this script, we’re looking for primary level authentication. Apart from that, we have the options of using OAUTH & many other acceptable formats.

protocols:
- HTTP

In this case, we’re going to use – “HTTP” as our preferred communication protocol.

responses:
      403:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "Resource does not exists!"
              }
      400:
        body:
          application/json:
            properties:
              message: string
            example:
              {
                message : "API Key is invalid!"
              }
      200:
        body:
          application/json:
            type: apiresponse
            example:
              {
                "valid":true,
                "number":"17579758240",
                "local_format":"7579758240",
                "international_format":"+17579758240",
                "country_prefix":"+1",
                "country_code":"US",
                "country_name":"United States of America",
                "location":"Nwptnwszn1",
                "carrier":"MetroPCS Communications Inc.",
                "line_type":"mobile"
              }

We’ve created a provision for a few specific cases of response as part of our business logic & standards.

Once, we’re done with our coding, we need to focus on two places as shown in the below picture –

26. Validation - mock API - Mulesoft

The snippet marked in RED square box, identifying our mandatory input parameters shown in the code as well as the right-hand side of the walls.

To test this mock API locally, you can pass these key parameters as follows –

27. Validation - mock API - Mulesoft - Continue

Now, you have to click the Send button marked in a GREEN square box. This will send your query parameters & as per our API response, you can see the output just below the Send button as follows –

28. Validation - mock API - Mulesoft - Continue

Now, we’re good to publish this mock API in the Mulesoft Anywhere portal. This will help us to test it from an external application i.e., Python-based application for our case. So, click the “Publish” button highlighted with the Blue square box. That will prompt the following screen –

29. Published

Now, we’ll click the “Public to Exchange” button marked with the GREEN square box. This will prompt the next screen as shown below –

30. Published - Continue

Now, you need to fill up the relevant details & then click – “Publish to Exchange,” as shown above. And, that will lead to the following screen –

31. Published - Continue

And, after a few second you will see the next screen –

32. Published - Continue

Now, you can click “Done” to close this popup. And, to verify the status, you can check it by clicking the top-left side of the code-editor & then click “Design Center” as shown below –

33. Published - Final

So, we’re done with our Mulesoft mock API design & deployment. Let’s test it from our Python application. We’ll be only discussing the key snippets here.

2. clsConfig.py (This is the parameter file for our mock API script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 04-Apr-2020              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### Mulesoft Mock API. Application will  ####
#### process these information & perform  ####
#### the call to our newly developed Mock ####
#### API in Mulesoft.                     ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "https://anypoint.mulesoft.com/mocking/api/v1/links/a23e4e71-9c25-317b-834b-10b0debc3a30/validate",
        'CLIENT_SECRET': 'a12345670bacb1e3cec55e2f1234567d',
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Mule Mock API Calling!',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path
    }

The key snippet from the above script is –

‘URL’: https://anypoint.mulesoft.com/mocking/api/v1/links/a23e4e71-9c25-317b-834b-10b0debc3a30/validate&#8221;,

This URL received from our RAML-editor generated by the Mulesoft API Designer studio.

3. clsMuleMockAPI.py (This is the main class to invoke our mock API script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 30-Jul-2020              ####
#### Modified On 30-Jul-2020              ####
####                                      ####
#### Objective: Main class scripts to     ####
#### invoke mock API.                     ####
##############################################

import json
from clsConfig import clsConfig as cf
import requests
import logging

class clsMuleMockAPI:
    def __init__(self):
        self.url = cf.config['URL']
        self.muleapi_key = cf.config['CLIENT_SECRET']
        self.muleapi_cache = cf.config['CACHE']
        self.muleapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, phNumber, cntCode, fmt):
        try:
            url = self.url
            muleapi_key = self.muleapi_key
            muleapi_cache = self.muleapi_cache
            muleapi_con = self.muleapi_con
            type = self.type

            querystring = {"access_key": muleapi_key, "number": phNumber, "country_code": cntCode, "format": fmt}

            print('Input JSON: ', str(querystring))

            headers = {
                'content-type': type,
                'Cache-Control': muleapi_cache,
                'Connection': muleapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

And, the key snippet from the above code –

querystring = {"access_key": muleapi_key, "number": phNumber, "country_code": cntCode, "format": fmt}

In the above lines, we’re preparing the query string, which will be passed into the API call.

response = requests.request("GET", url, headers=headers, params=querystring)

Invoking our API using requests method in python.

4. callMuleMockAPI.py (This is the first calling script to invoke our mock API script through our developed class python script.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 30-Jul-2020              ####
#### Modified On 30-Jul-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsMuleMockAPI as cw
import pandas as p
import json

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        # Declared Variable
        ret_1 = 0
        debug_ind = 'Y'
        res_2 = ''

        # Defining Generic Log File
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'MockMuleAPI.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print()

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to Mock Mulesoft API Calling Program: ')
        print('-' * 160)
        print('Please Press 1 for better formatted JSON: (Suitable for reading or debugging) ')
        print('Please Press 2 for unformated JSON: ')
        print()
        input_choice = int(input('Please provide your choice:'))
        print()

        # Create the instance of the Mock Mulesoft API Class
        x2 = cw.clsMuleMockAPI()

        # Let's pass this to our map section
        if input_choice == 1:
            fmt = "1"
            phNumber = str(input('Please provide the Phone Number (Without the country Code):'))
            cntCode  = str(input('Please provide the Country Code (Example: US):'))
            print()

            retJson = x2.searchQry(phNumber, cntCode, fmt )
        elif input_choice == 2:
            fmt = "0"
            phNumber = str(input('Please provide the Phone Number (Without the country Code):'))
            cntCode = str(input('Please provide the Country Code (Example: US):'))
            print()

            retJson = x2.searchQry(phNumber, cntCode, fmt)
        else:
            print('Invalid options!')
            retJson = {'errorDetails': 'Invalid Options!'}

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        res = json.loads(retJson)

        # Printing formatted JSON
        print()
        print('Output JSON::')
        print(json.dumps(res, indent=2))

        # Converting dictionary to Pandas Dataframe
        # df_ret = p.read_json(ret_2, orient='records')
        df_ret = p.io.json.json_normalize(res)
        df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

        # Removing any duplicate columns
        df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

        print()
        print()
        print("-" * 160)

        print('Publishing sample result: ')
        print(df_ret.head())

        # Logging Final Output
        l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')

        print("-" * 160)
        print()

        print('Finished Analysis points..')
        print("*" * 160)
        logging.info('Finished Analysis points..')
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The above script is pretty straight forward. First, we’re instantiating our essential class by this line –

# Create the instance of the Mock Mulesoft API Class
x2 = cw.clsMuleMockAPI()

And, then based on the logical condition we’re invoking it as follows –

retJson = x2.searchQry(phNumber, cntCode, fmt )

Now, we would like to explore the directory structure both in MAC & Windows –

14. Dir

Topside represents the MAC O/S structure, whereas the bottom part represents the Windows directory structure.

Let’s run the python application to test it.

10. Program_Run

In this case, the bottom side represents the MAC run, whereas the top side represents Windows run status.

The sample CSV log should look something like this –

Windows:

15. Log Win CSV

MAC:

15. Log CSV MAC

So, we’ve done it.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Building a Python-based airline solution using Amadeus API

Hi Guys,

Today, I’ll share a little different topic in Python compared to my last couple of posts, where I have demonstrated the use of Python in the field of machine learning & forecast modeling.

We’ll explore to create meaningful sample data points for Airlines & hotel reservations. At this moment, this industry is the hard-hit due to the pandemic. And I personally wish a speedy recovery to all employees who risked their lives to maintain the operation or might have lost their jobs due to this time.

I’ll be providing only major scripts & will show how you can extract critical data from their API.

However, to create the API, you need to register in Amadeus as a developer & follow specific steps to get the API details. You will need to register using the following link.

Step 1:

1. Generating API - Step 1

Once you provide the necessary details, you need to activate your account by clicking the email validation.

Step 2:

As part of the next step, you will be clicking the “Self-Service Workspace” option as marked in the green box shown above.

Now, you have to click My apps & under that, you need to click – Create new appshown below –

2. Generating API - Step 2

Step 3:

You need to provide the following details before creating the API. Note that once you create – it will take 30 minutes to activate the API-link.

3. Generating API - Step 3

Step 4:

You will come to the next page once you click the “Create” button in the previous step.

4. Generating API - Step 4

For production, you need to create a separate key shown above.

You need to install the following packages –

pip install amadeus

And, the installation process is shown as –

5. Installing Packages

pip install flatten_json

And, this installation process is shown as –

6. Installing Packages - Continuation

1. clsAmedeus (This is the API script, which will send the API requests & return JSON if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 05-Jul-2020              ####
#### Modified On 05-Jul-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from amadeus import Client, ResponseError
import json
from clsConfig import clsConfig as cf

class clsAmedeus:
    def __init__(self):
        self.client_id = cf.config['CLIENT_ID']
        self.client_secret = cf.config['CLIENT_SECRET']
        self.type = cf.config['API_TYPE']

    def flightOffers(self, origLocn, destLocn, departDate, noOfAdult):
        try:
            cnt = 0

            # Setting Clients
            amadeus = Client(
                                client_id=str(self.client_id),
                                client_secret=str(self.client_secret)
                            )

            # Flight Offers
            response = amadeus.shopping.flight_offers_search.get(
                originLocationCode=origLocn,
                destinationLocationCode=destLocn,
                departureDate=departDate,
                adults=noOfAdult)

            ResJson = response.data

            return ResJson
        except Exception as e:
            print(e)
            x = str(e)
            ResJson = {'errorDetails': x}

            return ResJson

    def cheapestDate(self, origLocn, destLocn):
        try:
            # Setting Clients
            amadeus = Client(
                client_id=self.client_id,
                client_secret=self.client_secret
            )

            # Flight Offers
            # Flight Cheapest Date Search
            response = amadeus.shopping.flight_dates.get(origin=origLocn, destination=destLocn)

            ResJson = response.data

            return ResJson
        except Exception as e:
            print(e)
            x = str(e)
            ResJson = {'errorDetails': x}

            return ResJson

    def listOfHotelsByCity(self, origLocn):
        try:
            # Setting Clients
            amadeus = Client(
                client_id=self.client_id,
                client_secret=self.client_secret
            )

            # Hotel Search
            # Get list of Hotels by city code
            response = amadeus.shopping.hotel_offers.get(cityCode=origLocn)

            ResJson = response.data

            return ResJson
        except Exception as e:
            print(e)
            x = str(e)
            ResJson = {'errorDetails': x}

            return ResJson

    def listOfOffersBySpecificHotels(self, hotelID):
        try:
            # Setting Clients
            amadeus = Client(
                client_id=self.client_id,
                client_secret=self.client_secret
            )

            # Get list of offers for a specific hotel
            response = amadeus.shopping.hotel_offers_by_hotel.get(hotelId=hotelID)

            ResJson = response.data

            return ResJson
        except Exception as e:
            print(e)
            x = str(e)
            ResJson = {'errorDetails': x}

            return ResJson

    def hotelReview(self, hotelID):
        try:
            # Setting Clients
            amadeus = Client(
                client_id=self.client_id,
                client_secret=self.client_secret
            )

            # Hotel Ratings
            # What travelers think about this hotel?
            response = amadeus.e_reputation.hotel_sentiments.get(hotelIds=hotelID)

            ResJson = response.data

            return ResJson
        except Exception as e:
            print(e)
            x = str(e)
            ResJson = {'errorDetails': x}

            return ResJson

    def process(self, choice, origLocn, destLocn, departDate, noOfAdult, hotelID):
        try:
            # Main Area to call apropriate choice
            if choice == 1:
                resJson = self.flightOffers(origLocn, destLocn, departDate, noOfAdult)
            elif choice == 2:
                resJson = self.cheapestDate(origLocn, destLocn)
            elif choice == 3:
                resJson = self.listOfHotelsByCity(origLocn)
            elif choice == 4:
                resJson = self.listOfOffersBySpecificHotels(hotelID)
            elif choice == 5:
                resJson = self.hotelReview(hotelID)
            else:
                resJson = {'errorDetails': 'Invalid Options!'}

            # Converting back to JSON
            jdata = json.dumps(resJson)

            # Checking the begining character
            # for the new package
            # As that requires dictionary array
            # Hence, We'll be adding '[' if this
            # is missing from the return payload
            SYM = jdata[:1]
            if SYM != '[':
                rdata = '[' + jdata + ']'
            else:
                rdata = jdata

            ResJson = json.loads(rdata)

            return ResJson

        except ResponseError as error:
            x = str(error)
            resJson = {'errorDetails': x}

            return resJson

Let’s explore the key lines –

Creating an instance of the client by providing the recently acquired API Key & API-Secret.

# Setting Clients
amadeus = Client(
                    client_id=str(self.client_id),
                    client_secret=str(self.client_secret)
                )

The following lines are used to fetch the API response for specific business cases. Different invocation of API retrieve different data –

# Flight Offers
# Flight Cheapest Date Search
response = amadeus.shopping.flight_dates.get(origin=origLocn, destination=destLocn)

The program will navigate to particular methods to invoke certain features –

# Main Area to call apropriate choice
if choice == 1:
    resJson = self.flightOffers(origLocn, destLocn, departDate, noOfAdult)
elif choice == 2:
    resJson = self.cheapestDate(origLocn, destLocn)
elif choice == 3:
    resJson = self.listOfHotelsByCity(origLocn)
elif choice == 4:
    resJson = self.listOfOffersBySpecificHotels(hotelID)
elif choice == 5:
    resJson = self.hotelReview(hotelID)
else:
    resJson = {'errorDetails': 'Invalid Options!'}

2. callAmedeusAPI (This is the main script, which will invoke the Amadeus API & return dataframe if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 05-Jul-2020              ####
#### Modified On 05-Jul-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsAmedeus as cw
import pandas as p
import json

# Newly added package
from flatten_json import flatten

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        # Declared Variable
        ret_1 = 0
        textOrig = ''
        textDest = ''
        textDate = ''
        intAdult = 0
        textHotelID = ''
        debug_ind = 'Y'
        res_2 = ''

        # Defining Generic Log File
        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'AmadeusAPI.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        log_dir = cf.config['LOG_PATH']
        curr_ver =datetime.datetime.now().strftime("%Y-%m-%d")

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        print('Welcome to Amadeus Calling Program: ')
        print('-' * 60)
        print('Please Press 1 for flight offers.')
        print('Please Press 2 for cheapest date.')
        print('Please Press 3 for list of hotels by city.')
        print('Please Press 4 for list of offers by specific hotel.')
        print('Please Press 5 for specific hotel review.')
        input_choice = int(input('Please provide your choice:'))

        # Create the instance of the Amadeus Class
        x2 = cw.clsAmedeus()

        # Let's pass this to our map section
        if input_choice == 1:
            textOrig = str(input('Please provide the Origin:'))
            textDest = str(input('Please provide the Destination:'))
            textDate = str(input('Please provide the Depart Date:'))
            intAdult = int(input('Please provide the No Of Adult:'))

            retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
        elif input_choice == 2:
            textOrig = str(input('Please provide the Origin:'))
            textDest = str(input('Please provide the Destination:'))

            retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
        elif input_choice == 3:
            textOrig = str(input('Please provide the Origin:'))

            retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
        elif input_choice == 4:
            textHotelID = str(input('Please provide the Hotel Id:'))

            retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
        elif input_choice == 5:
            textHotelID = str(input('Please provide the Hotel Id:'))

            retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
        else:
            print('Invalid options!')
            retJson = {'errorDetails': 'Invalid Options!'}

        #print('JSON::')
        #print(retJson)

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        res_1 = json.dumps(retJson)
        res = json.loads(res_1)

        # Newly added JSON Parse package
        dic_flattened = (flatten(d) for d in res)
        df_ret = p.DataFrame(dic_flattened)

        # Removing any duplicate columns
        df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

        print('Publishing sample result: ')
        print(df_ret.head())

        # Logging Final Output
        l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')

        print("-" * 60)
        print()

        print('Finding Analysis points..')
        print("*" * 157)
        logging.info('Finding Analysis points..')
        logging.info(tmpR0)

        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("Invalid option!")
        logging.info("Invalid option!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key lines from the above script –

# Create the instance of the Amadeus Class
x2 = cw.clsAmedeus()

The above line will instantiate the newly written Amadeus class.

# Let's pass this to our map section
if input_choice == 1:
    textOrig = str(input('Please provide the Origin:'))
    textDest = str(input('Please provide the Destination:'))
    textDate = str(input('Please provide the Depart Date:'))
    intAdult = int(input('Please provide the No Of Adult:'))

    retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
elif input_choice == 2:
    textOrig = str(input('Please provide the Origin:'))
    textDest = str(input('Please provide the Destination:'))

    retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
elif input_choice == 3:
    textOrig = str(input('Please provide the Origin:'))

    retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
elif input_choice == 4:
    textHotelID = str(input('Please provide the Hotel Id:'))

    retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
elif input_choice == 5:
    textHotelID = str(input('Please provide the Hotel Id:'))

    retJson = x2.process(input_choice, textOrig, textDest, textDate, intAdult, textHotelID)
else:
    print('Invalid options!')
    retJson = {'errorDetails': 'Invalid Options!'}

The above lines will fetch the response based on the supplied inputs in the form of JSON.

# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
res_1 = json.dumps(retJson)
res = json.loads(res_1)

Now, the above line will convert the return payload to JSON.

Sample JSON should look something like this –

20. Sample_JASON_Option_4_1

Now, using this new package, our application will flatten the complex nested JSON.

# Newly added JSON Parse package
dic_flattened = (flatten(d) for d in res)
df_ret = p.DataFrame(dic_flattened)

The given lines will remove any duplicate column if it exists.

# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

Let’s explore the directory structure –

13. Win_Dir

Let’s run our application –

We’ll invoke five different API’s (API related to different functionalities) & their business cases –

Run – Option 1:

7. Win_Run_Op_1

So, if we want to explore some of the key columns, below is the screenshot for a few sample data –

21. Key_Columns

Run – Option 2:

8. Win_Run_Op_2

Some of the vital sample data –

15. Option_2_Sample_Data

Run – Option 3:

9. Win_Run_Op_3

Sample relevant data for our analysis –

16. Option_3_Sample_Data

Run – Option 4:

10. Win_Run_Op_4

Few relevant essential information –

17. Option_4_Sample_Data

Run – Option 5:

11. Win_Run_Op_5

Finally, few sample records from the last option –

18. Option_5_Sample_Data

So, finally, we’ve done it. You will find that JSON package from this link.

During this challenging time, I would request you to follow strict health guidelines & stay healthy.

N.B.: All the data that are used here can be found in the public domain. We use this solely for educational purposes.

Canada’s Covid19 analysis based on Logistic Regression

Hi Guys,

Today, I’ll be demonstrating some scenarios based on open-source data from Canada. In this post, I will only explain some of the significant parts of the code. Not the entire range of scripts here.

Let’s explore a couple of sample source data –

2. Sample Input Data

I would like to explore how much this disease caused an impact on the elderly in Canada.

Let’s explore the source directory structure –

3. Source Directory Structures

For this, you need to install the following packages –

pip install pandas

pip install seaborn

Please find the PyPi link given below –

In this case, we’ve downloaded the data from Canada’s site. However, they have created API. So, you can consume the data through that way as well. Since the volume is a little large. I decided to download that in CSV & then use that for my analysis.

Before I start, let me explain a couple of critical assumptions that I had to make due to data impurities or availabilities.

  • If there is no data available for a specific case, my application will consider that patient as COVID-Active.
  • We will consider the patient is affected through Community-spreading until we have data to find it otherwise.
  • If there is no data available for gender, we’re marking these records as “Other.” So, that way, we’re making it into that category, where the patient doesn’t want to disclose their sexual orientation.
  • If we don’t have any data, then by default, the application is considering the patient is alive.
  • Lastly, my application considers the middle point of the age range data for all the categories, i.e., the patient’s age between 20 & 30 will be considered as 25.

1. clsCovidAnalysisByCountryAdv (This is the main script, which will invoke the Machine-Learning API & return 0 if successful.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 01-Jun-2020              ####
#### Modified On 01-Jun-2020              ####
####                                      ####
#### Objective: Main scripts for Logistic ####
#### Regression.                          ####
##############################################

import pandas as p
import clsL as log
import datetime

import matplotlib.pyplot as plt
import seaborn as sns
from clsConfig import clsConfig as cf

# %matplotlib inline -- for Jupyter Notebook
class clsCovidAnalysisByCountryAdv:
    def __init__(self):
        self.fileName_1 = cf.config['FILE_NAME_1']
        self.fileName_2 = cf.config['FILE_NAME_2']
        self.Ind = cf.config['DEBUG_IND']
        self.subdir = str(cf.config['LOG_DIR_NAME'])

    def setDefaultActiveCases(self, row):
        try:
            str_status = str(row['case_status'])

            if str_status == 'Not Reported':
                return 'Active'
            else:
                return str_status
        except:
            return 'Active'

    def setDefaultExposure(self, row):
        try:
            str_exposure = str(row['exposure'])

            if str_exposure == 'Not Reported':
                return 'Community'
            else:
                return str_exposure
        except:
            return 'Community'

    def setGender(self, row):
        try:
            str_gender = str(row['gender'])

            if str_gender == 'Not Reported':
                return 'Other'
            else:
                return str_gender
        except:
            return 'Other'

    def setSurviveStatus(self, row):
        try:
            # 0 - Deceased
            # 1 - Alive
            str_active = str(row['ActiveCases'])

            if str_active == 'Deceased':
                return 0
            else:
                return 1
        except:
            return 1

    def getAgeFromGroup(self, row):
        try:
            # We'll take the middle of the Age group
            # If a age range falls with 20, we'll
            # consider this as 10.
            # Similarly, a age group between 20 & 30,
            # should reflect by 25.
            # Anything above 80 will be considered as
            # 85

            str_age_group = str(row['AgeGroup'])

            if str_age_group == '<20':
                return 10
            elif str_age_group == '20-29':
                return 25
            elif str_age_group == '30-39':
                return 35
            elif str_age_group == '40-49':
                return 45
            elif str_age_group == '50-59':
                return 55
            elif str_age_group == '60-69':
                return 65
            elif str_age_group == '70-79':
                return 75
            else:
                return 85
        except:
            return 100

    def predictResult(self):
        try:
            
            # Initiating Logging Instances
            clog = log.clsL()

            # Important variables
            var = datetime.datetime.now().strftime(".%H.%M.%S")
            print('Target File Extension will contain the following:: ', var)
            Ind = self.Ind
            subdir = self.subdir

            #######################################
            #                                     #
            # Using Logistic Regression to        #
            # Idenitfy the following scenarios -  #
            #                                     #
            # Age wise Infection Vs Deaths        #
            #                                     #
            #######################################
            inputFileName_2 = self.fileName_2

            # Reading from Input File
            df_2 = p.read_csv(inputFileName_2)

            # Fetching only relevant columns
            df_2_Mod = df_2[['date_reported','age_group','gender','exposure','case_status']]
            df_2_Mod['State'] = df_2['province_abbr']

            print()
            print('Projecting 2nd file sample rows: ')
            print(df_2_Mod.head())

            print()
            x_row_1 = df_2_Mod.shape[0]
            x_col_1 = df_2_Mod.shape[1]

            print('Total Number of Rows: ', x_row_1)
            print('Total Number of columns: ', x_col_1)

            #########################################################################################
            # Few Assumptions                                                                       #
            #########################################################################################
            # By default, if there is no data on exposure - We'll treat that as community spreading #
            # By default, if there is no data on case_status - We'll consider this as active        #
            # By default, if there is no data on gender - We'll put that under a separate Gender    #
            # category marked as the "Other". This includes someone who doesn't want to identify    #
            # his/her gender or wants to be part of LGBT community in a generic term.               #
            #                                                                                       #
            # We'll transform our data accordingly based on the above logic.                        #
            #########################################################################################
            df_2_Mod['ActiveCases'] = df_2_Mod.apply(lambda row: self.setDefaultActiveCases(row), axis=1)
            df_2_Mod['ExposureStatus'] = df_2_Mod.apply(lambda row: self.setDefaultExposure(row), axis=1)
            df_2_Mod['Gender'] = df_2_Mod.apply(lambda row: self.setGender(row), axis=1)

            # Filtering all other records where we don't get any relevant information
            # Fetching Data for
            df_3 = df_2_Mod[(df_2_Mod['age_group'] != 'Not Reported')]

            # Dropping unwanted columns
            df_3.drop(columns=['exposure'], inplace=True)
            df_3.drop(columns=['case_status'], inplace=True)
            df_3.drop(columns=['date_reported'], inplace=True)
            df_3.drop(columns=['gender'], inplace=True)

            # Renaming one existing column
            df_3.rename(columns={"age_group": "AgeGroup"}, inplace=True)

            # Creating important feature
            # 0 - Deceased
            # 1 - Alive
            df_3['Survived'] = df_3.apply(lambda row: self.setSurviveStatus(row), axis=1)

            clog.logr('2.df_3' + var + '.csv', Ind, df_3, subdir)

            print()
            print('Projecting Filter sample rows: ')
            print(df_3.head())

            print()
            x_row_2 = df_3.shape[0]
            x_col_2 = df_3.shape[1]

            print('Total Number of Rows: ', x_row_2)
            print('Total Number of columns: ', x_col_2)

            # Let's do some basic checkings
            sns.set_style('whitegrid')
            #sns.countplot(x='Survived', hue='Gender', data=df_3, palette='RdBu_r')

            # Fixing Gender Column
            # This will check & indicate yellow for missing entries
            #sns.heatmap(df_3.isnull(), yticklabels=False, cbar=False, cmap='viridis')

            #sex = p.get_dummies(df_3['Gender'], drop_first=True)
            sex = p.get_dummies(df_3['Gender'])
            df_4 = p.concat([df_3, sex], axis=1)

            print('After New addition of columns: ')
            print(df_4.head())

            clog.logr('3.df_4' + var + '.csv', Ind, df_4, subdir)

            # Dropping unwanted columns for our Machine Learning
            df_4.drop(columns=['Gender'], inplace=True)
            df_4.drop(columns=['ActiveCases'], inplace=True)
            df_4.drop(columns=['Male','Other','Transgender'], inplace=True)

            clog.logr('4.df_4_Mod' + var + '.csv', Ind, df_4, subdir)

            # Fixing Spread Columns
            spread = p.get_dummies(df_4['ExposureStatus'], drop_first=True)
            df_5 = p.concat([df_4, spread], axis=1)

            print('After Spread columns:')
            print(df_5.head())

            clog.logr('5.df_5' + var + '.csv', Ind, df_5, subdir)

            # Dropping unwanted columns for our Machine Learning
            df_5.drop(columns=['ExposureStatus'], inplace=True)

            clog.logr('6.df_5_Mod' + var + '.csv', Ind, df_5, subdir)

            # Fixing Age Columns
            df_5['Age'] = df_5.apply(lambda row: self.getAgeFromGroup(row), axis=1)
            df_5.drop(columns=["AgeGroup"], inplace=True)

            clog.logr('7.df_6' + var + '.csv', Ind, df_5, subdir)

            # Fixing Dummy Columns Name
            # Renaming one existing column Travel-Related with Travel_Related
            df_5.rename(columns={"Travel-Related": "TravelRelated"}, inplace=True)

            clog.logr('8.df_7' + var + '.csv', Ind, df_5, subdir)

            # Removing state for temporary basis
            df_5.drop(columns=['State'], inplace=True)
            # df_5.drop(columns=['State','Other','Transgender','Pending','TravelRelated','Male'], inplace=True)

            # Casting this entire dataframe into Integer
            # df_5_temp.apply(p.to_numeric)

            print('Info::')
            print(df_5.info())
            print("*" * 60)
            print(df_5.describe())
            print("*" * 60)

            clog.logr('9.df_8' + var + '.csv', Ind, df_5, subdir)

            print('Intermediate Sample Dataframe for Age::')
            print(df_5.head())

            # Plotting it to Graph
            sns.jointplot(x="Age", y='Survived', data=df_5)
            sns.jointplot(x="Age", y='Survived', data=df_5, kind='kde', color='red')
            plt.xlabel("Age")
            plt.ylabel("Data Point (0 - Died   Vs    1 - Alive)")

            # Another check with Age Group
            sns.countplot(x='Survived', hue='Age', data=df_5, palette='RdBu_r')
            plt.xlabel("Survived(0 - Died   Vs    1 - Alive)")
            plt.ylabel("Total No Of Patient")

            df_6 = df_5.drop(columns=['Survived'], axis=1)

            clog.logr('10.df_9' + var + '.csv', Ind, df_6, subdir)

            # Train & Split Data
            x_1 = df_6
            y_1 = df_5['Survived']

            # Now Train-Test Split of your source data
            from sklearn.model_selection import train_test_split

            # test_size => % of allocated data for your test cases
            # random_state => A specific set of random split on your data
            X_train_1, X_test_1, Y_train_1, Y_test_1 = train_test_split(x_1, y_1, test_size=0.3, random_state=101)

            # Importing Model
            from sklearn.linear_model import LogisticRegression

            logmodel = LogisticRegression()
            logmodel.fit(X_train_1, Y_train_1)

            # Adding Predictions to it
            predictions_1 = logmodel.predict(X_test_1)

            from sklearn.metrics import classification_report

            print('Classification Report:: ')
            print(classification_report(Y_test_1, predictions_1))

            from sklearn.metrics import confusion_matrix

            print('Confusion Matrix:: ')
            print(confusion_matrix(Y_test_1, predictions_1))

            # This is require when you are trying to print from conventional
            # front & not using Jupyter notebook.
            plt.show()

            return 0

        except Exception  as e:
            x = str(e)
            print('Error : ', x)

            return 1

Key snippets from the above script –

df_2_Mod['ActiveCases'] = df_2_Mod.apply(lambda row: self.setDefaultActiveCases(row), axis=1)
df_2_Mod['ExposureStatus'] = df_2_Mod.apply(lambda row: self.setDefaultExposure(row), axis=1)
df_2_Mod['Gender'] = df_2_Mod.apply(lambda row: self.setGender(row), axis=1)

# Filtering all other records where we don't get any relevant information
# Fetching Data for
df_3 = df_2_Mod[(df_2_Mod['age_group'] != 'Not Reported')]

# Dropping unwanted columns
df_3.drop(columns=['exposure'], inplace=True)
df_3.drop(columns=['case_status'], inplace=True)
df_3.drop(columns=['date_reported'], inplace=True)
df_3.drop(columns=['gender'], inplace=True)

# Renaming one existing column
df_3.rename(columns={"age_group": "AgeGroup"}, inplace=True)

# Creating important feature
# 0 - Deceased
# 1 - Alive
df_3['Survived'] = df_3.apply(lambda row: self.setSurviveStatus(row), axis=1)

The above lines point to the critical transformation areas, where the application is invoking various essential business logic.

Let’s see at this moment our sample data –

6. 4_4_mod

Let’s look into the following part –

# Fixing Spread Columns
spread = p.get_dummies(df_4['ExposureStatus'], drop_first=True)
df_5 = p.concat([df_4, spread], axis=1)

The above lines will transform the data into this –

7. 5_5_Mod

As you can see, we’ve transformed the row values into columns with binary values. This kind of transformation is beneficial.

# Plotting it to Graph
sns.jointplot(x="Age", y='Survived', data=df_5)
sns.jointplot(x="Age", y='Survived', data=df_5, kind='kde', color='red')
plt.xlabel("Age")
plt.ylabel("Data Point (0 - Died   Vs    1 - Alive)")

# Another check with Age Group
sns.countplot(x='Survived', hue='Age', data=df_5, palette='RdBu_r')
plt.xlabel("Survived(0 - Died   Vs    1 - Alive)")
plt.ylabel("Total No Of Patient")

The above lines will process the data & visualize based on that.

x_1 = df_6
y_1 = df_5['Survived']

In the above snippet, we’ve assigned the features & target variable for our final logistic regression model.

# Now Train-Test Split of your source data
from sklearn.model_selection import train_test_split

# test_size => % of allocated data for your test cases
# random_state => A specific set of random split on your data
X_train_1, X_test_1, Y_train_1, Y_test_1 = train_test_split(x_1, y_1, test_size=0.3, random_state=101)

# Importing Model
from sklearn.linear_model import LogisticRegression

logmodel = LogisticRegression()
logmodel.fit(X_train_1, Y_train_1)

In the above snippet, we’re splitting the primary data & create a set of test & train data. Once we have the collection, the application will put the logistic regression model. And, finally, we’ll fit the training data.

# Adding Predictions to it
predictions_1 = logmodel.predict(X_test_1)

from sklearn.metrics import classification_report

print('Classification Report:: ')
print(classification_report(Y_test_1, predictions_1))

The above lines, finally use the model & then we feed our test data.

Let’s see how it runs –

5.1.Run_Windows
5.2. Run_Windows

And, here is the log directory –

4. Logs

For better understanding, I’m just clubbing both the diagram at one place & the final outcome is showing as follows –

1. MergeReport

So, from the above picture, we can see that the maximum vulnerable patients are patients who are 80+. The next two categories that also suffered are 70+ & 60+.

Also, We’ve checked the Female Vs. Male in the following code –

sns.countplot(x='Survived', hue='Female', data=df_5, palette='RdBu_r')
plt.xlabel("Survived(0 - Died   Vs    1 - Alive)")
plt.ylabel("Female Vs Male (Including Other Genders)")

And, the analysis represents through this –

8. Female_Male

In this case, you have to consider that the Male part includes all the other genders apart from the actual Male. Hence, I believe death for females would be more compared to people who identified themselves as males.

So, finally, we’ve done it.

During this challenging time, I would request you to follow strict health guidelines & stay healthy.

N.B.: All the data that are used here can be found in the public domain. We use this solely for educational purposes. You can find the details here.

Creating a Cross-platform GUI based application using native Python using PyQt5

Hi Guys!

Today, We’ll be discussing one more graphical package in Python, which is also known as PyQt. To faster design the GUI, we’ll be exploring another tool called Qt Designer, which is available for multiple OS platforms.

Please find the QT Designer here.

This is similar to any other GUI based IDE like Microsoft Visual Studio, where you can quickly generate your GUI template.

The majority of the internet post talks about using PyQt5 or PyQt4 packages. But, when speaking about using the .ui file inside your Python code – they either demonstrate fundamental options without any event or, they convert & generate the .ui file into .py file & then they use it. This certainly not making it very useful for many of the developers who are trying to use it for the first time. Hence, My main goal is to use the .ui file inside my Python script as it is & use all the components out of it & assign various working events.

In this post, we’ll discuss only with one script & then we’ll showcase the output in the form of video (No audio). You can verify the output for both MAC & Windows.

Before we start, let us check the directory structure between Windows & MAC –

2. MAC & Win Directory Structure

Let us explore how the GUI should look like ->

3. GUI Design

So, as you can see that this tool is like any other GUI based tool, basically you can create anything by simply drag & drop method.

Before we start discussing our code, here is the sample basicAdv.ui file for your reference.

You need to install the following framework –

pip install PyQt5

1. GUIPyQt5.py (This script contains all the GUI details & it will invoke the instance along with the logic.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Mar-2020              ####
#### Modified On 12-Mar-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from PyQt5 import QtWidgets, uic, QtGui, QtCore
from PyQt5.QtWidgets import *
import sys

class Ui(QtWidgets.QMainWindow):
    def __init__(self):
        # Instantiating the main class
        super(Ui, self).__init__()

        # Loading the Graphical Design without
        # converting it to any kind of Python code
        uic.loadUi('basicAdv.ui', self)

        # Adding all the essential buttons
        self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
        self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

        self.clrBtn = self.findChild(QtWidgets.QPushButton, 'clrBtn')  # Find the button
        self.clrBtn.clicked.connect(self.clearButtonClick)  # Remember to pass the definition/method, not the return value!

        self.addBtn = self.findChild(QtWidgets.QPushButton, 'addBtn')  # Find the button
        self.addBtn.clicked.connect(self.addItem)  # Remember to pass the definition/method, not the return value!

        self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
        self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

        self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
        self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

        # Adding other static input/output elements
        self.input = self.findChild(QtWidgets.QLineEdit, 'input')
        self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
        self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
        self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')
        self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

        # Adding Combobox
        self.combo = self.findChild(QtWidgets.QComboBox, 'sComboBox')  # Find the ComboBox

        # Adding static element to it
        self.combo.addItem("Sourav Ganguly")
        self.combo.addItem("Kapil Dev")
        self.combo.addItem("Sunil Gavaskar")
        self.combo.addItem("M. S. Dhoni")

        # Click Event
        self.combo.activated[str].connect(self.onChanged)  # Remember to pass the definition/method, not the return value!

        # Adding list Box
        self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

        # Adding static element to it
        self.listwidget2.insertItem(0, "Aamir Khan")
        self.listwidget2.insertItem(1, "Shahruk Khan")
        self.listwidget2.insertItem(2, "Salman Khan")
        self.listwidget2.insertItem(3, "Hrittik Roshon")
        self.listwidget2.insertItem(4, "Amitabh Bachhan")

        # Click Event
        self.listwidget2.clicked.connect(self.showIndividualElement)

        # Adding Group Box
        self.groupBox = self.findChild(QtWidgets.QGroupBox, 'groupBox')  # Find the ComboBox
        self.groupBox.setCheckable(True)

        # Adding Individual Radio Button
        self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
        self.rdButton1.setChecked(True)
        self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

        self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
        self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

        self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
        self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

        self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
        self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

        self.show()

    def printRadioButtonClick(self, radioOption):

        if radioOption.text() == 'China':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'India':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'Japan':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

        if radioOption.text() == 'France':
            if radioOption.isChecked() == True:
                print(radioOption.text() + ' is selected')
            else:
                print(radioOption.text() + ' is deselected')

    def printButtonClick(self):
        # This is executed when the button is pressed
        print('Input text:' + self.input.text())

    def clearButtonClick(self):
        # This is executed when the button is pressed
        self.input.clear()

    def onChanged(self, text):
        self.qlabel.setText(text)
        self.qlabel.adjustSize()
        self.lineEdit.clear()  # Clear the text

    def addItem(self):
        value = self.lineEdit.text() # Get the value of the lineEdit
        self.lineEdit.clear() # Clear the text
        self.listWidget.addItem(value) # Add the value we got to the list

    def setImage(self):
        fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
        if fileName: # If the user gives a file
            pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
            pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
            self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
            self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

    def showDialog(self):
        msgBox = QMessageBox()
        msgBox.setIcon(QMessageBox.Information)
        msgBox.setText("Message box pop up window")
        msgBox.setWindowTitle("MessageBox Example")
        msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
        msgBox.buttonClicked.connect(self.msgButtonClick)

        returnValue = msgBox.exec()
        if returnValue == QMessageBox.Ok:
            print('OK clicked')

    def msgButtonClick(self, i):
        print("Button clicked is:", i.text())

    def showIndividualElement(self, qmodelindex):
        item = self.listwidget2.currentItem()
        print(item.text())

if __name__ == "__main__":

    import sys
    app = QtWidgets.QApplication(sys.argv)
    window = Ui()
    window.show()
    sys.exit(app.exec_())

Let us explore a few key lines from this script. Rests are almost identical.

# Loading the Graphical Design without
# converting it to any kind of Python code
uic.loadUi('basicAdv.ui', self)

Loading the GUI created using Qt Designer into the Python environment.

# Adding all the essential buttons
self.prtBtn = self.findChild(QtWidgets.QPushButton, 'prtBtn') # Find the button
self.prtBtn.clicked.connect(self.printButtonClick) # Remember to pass the definition/method, not the return value!

In this case, we’re dynamically binding the component from the GUI by using the findChild method & then on the next line, we’re invoking the appropriate event associated with that. In this case, it is – self.printButtonClick.

The printButtonClick as mentioned earlier is a method & that contains the following snippet –

def printButtonClick(self):
    # This is executed when the button is pressed
    print('Input text:' + self.input.text())

As you can see, this event will capture the text from the input textbox & print it on our terminal.

Here is the snippet for those widgets, which is part of only input/output & they generally don’t have an event of their own. But, we need to bind them with our Python application.

# Adding other static input/output elements
self.input = self.findChild(QtWidgets.QLineEdit, 'input')
self.qlabel = self.findChild(QtWidgets.QLabel, 'qlabel')
self.lineEdit = self.findChild(QtWidgets.QLineEdit, 'lineEdit')
self.listWidget = self.findChild(QtWidgets.QListWidget, 'listWidget')

This application has drop-down list & hence, we’ve added some static value during our load of this application & that can be seen here –

# Adding list Box
self.listwidget2 = self.findChild(QtWidgets.QListWidget, 'listwidget2')  # Find the List

# Adding static element to it
self.listwidget2.insertItem(0, "Aamir Khan")
self.listwidget2.insertItem(1, "Shahruk Khan")
self.listwidget2.insertItem(2, "Salman Khan")
self.listwidget2.insertItem(3, "Hrittik Roshon")
self.listwidget2.insertItem(4, "Amitabh Bachhan")

Once, the user will select a specific value from this list, the app will execute the following event as shown below –

# Click Event
self.listwidget2.clicked.connect(self.showIndividualElement)

Again, to explore the method, you need to view the given logic –

def showIndividualElement(self, qmodelindex):
    item = self.listwidget2.currentItem()
    print(item.text())

Group Box, along with the radio button, works slightly different than our drop-down list.

For each radio button, we’ll have a dedicated text value that represents a different country in this context.

And, our application will bind all the radio button & then they will use one standard method for all of these four options as shown below –

# Adding Individual Radio Button
self.rdButton1 = self.findChild(QtWidgets.QRadioButton, 'rdButton1')  # Find the button
self.rdButton1.setChecked(True)
self.rdButton1.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton1))  # Remember to pass the definition/method, not the return value!

self.rdButton2 = self.findChild(QtWidgets.QRadioButton, 'rdButton2')  # Find the button
self.rdButton2.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton2))  # Remember to pass the definition/method, not the return value!

self.rdButton3 = self.findChild(QtWidgets.QRadioButton, 'rdButton3')  # Find the button
self.rdButton3.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton3))  # Remember to pass the definition/method, not the return value!

self.rdButton4 = self.findChild(QtWidgets.QRadioButton, 'rdButton4')  # Find the button
self.rdButton4.toggled.connect(lambda: self.printRadioButtonClick(self.rdButton4))  # Remember to pass the definition/method, not the return value!

Also, note that, by default, rdButton1 is set to True i.e., it will be selected when the form load initially.

Let’s explore the printRadioButtonClick event.

def printRadioButtonClick(self, radioOption):

    if radioOption.text() == 'China':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'India':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'Japan':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

    if radioOption.text() == 'France':
        if radioOption.isChecked() == True:
            print(radioOption.text() + ' is selected')
        else:
            print(radioOption.text() + ' is deselected')

This will capture the radio button option & based on the currently clicked button, it will fetch the text out of it. Finally, that will match with the logic here & based on that, our application will display the output.

Finally, the Image process is slightly different.

Initially, our application will load the component from the .ui file & bind them with the Python environment –

self.imageLbl = self.findChild(QtWidgets.QLabel, 'imageLbl')

Image load option will only work when the user clicks the button that triggers the following sets of actions –

self.selectImgBtn = self.findChild(QtWidgets.QPushButton, 'selectImgBtn')  # Find the button
self.selectImgBtn.clicked.connect(self.setImage)  # Remember to pass the definition/method, not the return value!

Let’s explore the setImage method –

def setImage(self):
    fileName, _ = QtWidgets.QFileDialog.getOpenFileName(None, "Select Image", "", "Image Files (*.png *.jpg *jpeg *.bmp);;All Files (*)") # Ask for file
    if fileName: # If the user gives a file
        pixmap = QtGui.QPixmap(fileName) # Setup pixmap with the provided image
        pixmap = pixmap.scaled(self.imageLbl.width(), self.imageLbl.height(), QtCore.Qt.KeepAspectRatio) # Scale pixmap
        self.imageLbl.setPixmap(pixmap) # Set the pixmap onto the label
        self.imageLbl.setAlignment(QtCore.Qt.AlignCenter) # Align the label to center

This will prompt the corresponding dialogue box for choosing the right images out of the respective O/S.

Last but not least, the use of MsgBox, which can be extremely useful for many GUI based programming.

This msgbox doesn’t exist in the form. However, we’re creating it on the event of the “Confirm Button” as shown below –

self.cnfBtn = self.findChild(QtWidgets.QPushButton, 'cnfBtn')  # Find the button
self.cnfBtn.clicked.connect(self.showDialog)  # Remember to pass the definition/method, not the return value!

This will prompt the showDialog method to trigger –

def showDialog(self):
    msgBox = QMessageBox()
    msgBox.setIcon(QMessageBox.Information)
    msgBox.setText("Message box pop up window")
    msgBox.setWindowTitle("MessageBox Example")
    msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
    msgBox.buttonClicked.connect(self.msgButtonClick)

    returnValue = msgBox.exec()
    if returnValue == QMessageBox.Ok:
        print('OK clicked')

And, based on your options (“OK”/”Cancel”), it will prompt the final captured message in your console.

Let’s explore the videos of output from Windows O/S –

Let’s explore the video output from MAC VM –

For more information on this package – please check the following link.

So, as you can see, finally we’ve achieved it. We’ve demonstrated cross-platform GUI applications using native Python. And, here we didn’t even convert the ui design file to python script either.

Please share your feedback.

I’ll be posting another new post in the coming days. Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Building Python-based best-route apps for Indian Railways

Hi Guys!

Today, I’ll present a way to get the best route from Indian Railways train between two specific sources & destination using third-party API.

This approach is particularly beneficial if you want to integrate this logic in Azure Function or Lambda Function or any serverless functions.

Before we dig into the details. Let us explore what kind of cloud-based architecture we can implement this.

Architecture

Fig: 1 (Cloud Architecture)

In this case, I’ve considered Azure as the implementation platform.

Let’s discuss how the events will take place. At first, a user searches for the best routes between two fixed stations. The user has to provide the source & destination stations. The request will go through the Azure Firewall after validating the initial authentication. As part of the API service, it will check for similar queries & if it is there, then it will fetch it from the cache & send it back to the user through their mobile application. However, for the first time, it will retrieve the information from the DB & keep a copy in the cache. This part also managed through a load balancer for high-level availability. However, periodically system will push the data from the cache to the DB with the updated information.

Let’s see the program directory structure –

ProgramDir

Let’s discuss our code –

1. clsConfig.py (This script contains all the parameters for the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "https://trains.p.rapidapi.com/",
        'RAPID_API_HOST': "trains.p.rapidapi.com",
        'RAPID_API_KEY': "hrfjjdfjfjfjfjxxxxxjffjjfjfjfjfjfjfjf",
        'RAPID_API_TYPE': "application/json",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Indian Railway Train Schedule Search',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'COL_LIST': ['name','train_num','train_from','train_to','classes','departTime','arriveTime','Mon','Tue','Wed','Thu','Fri','Sat','Sun']
    }

As of now, I’ve replaced the API Key with the dummy value.

2. clsIndianRailway.py (This script will invoke the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsIndianRailway:
    def __init__(self):
        self.url = cf.config['URL']
        self.rapidapi_host = cf.config['RAPID_API_HOST']
        self.rapidapi_key = cf.config['RAPID_API_KEY']
        self.type = cf.config['RAPID_API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            rapidapi_host = self.rapidapi_host
            rapidapi_key = self.rapidapi_key
            type = self.type

            Ipayload = "{\"search\":\"" + rawQry + "\"}"

            jpayload = json.dumps(Ipayload)
            payload = json.loads(jpayload)

            print('Input JSON: ', str(payload))

            headers = {
                'x-rapidapi-host': rapidapi_host,
                'x-rapidapi-key': rapidapi_key,
                'content-type': type,
                'accept': type
                }

            response = requests.request("POST", url, data=payload, headers=headers)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

Let’s explain the critical snippet from the code.

url = self.url
rapidapi_host = self.rapidapi_host
rapidapi_key = self.rapidapi_key
type = self.type

Ipayload = "{\"search\":\"" + rawQry + "\"}"

jpayload = json.dumps(Ipayload)
payload = json.loads(jpayload)

The first four lines are to receive the parameter values. Our application needs to frame the search query, which is done in the IPayload variable. After that, our app will convert it into a json object type.

headers = {
    'x-rapidapi-host': rapidapi_host,
    'x-rapidapi-key': rapidapi_key,
    'content-type': type,
    'accept': type
    }

response = requests.request("POST", url, data=payload, headers=headers)

Now, the application will prepare the headers & send the request & received the response. Finally, that response will be sent by this script to the main callee application after extracting part of the response & converting that back to JSON are as follows –

response = requests.request("POST", url, data=payload, headers=headers)

ResJson  = response.text

jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)

return ResJson

3. callIndianRailwayAPI.py (This is the main script which invokes the main Indian Railway API & tries to get the response between two railway stations. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsIndianRailway as ct
import re
import numpy as np

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getArriveTimeOnly(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        str_arr_time, remain = lkp_arriveTime.split('+')

        return str_arr_time

    except Exception as e:
        x = str(e)
        str_arr_time = ''

        return str_arr_time

def getArriveDateDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        first_half, str_date_diff_init = lkp_arriveTime.split('+')

        # Replacing the text part from it & only capturing the integer part
        str_date_diff = int(re.sub(r"[a-z]","",str_date_diff_init, flags=re.I))

        return str_date_diff

    except Exception as e:
        x = str(e)
        str_date_diff = 0

        return str_date_diff

def getArriveTimeDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTimeM = str(row['arriveTimeM'])

        str_time_diff_init = int(re.sub(r'[^\w\s]', '', lkp_arriveTimeM))

        # Replacing the text part from it & only capturing the integer part
        str_time_diff = (2400 - str_time_diff_init)

        return str_time_diff

    except Exception as e:
        x = str(e)
        str_time_diff = 0

        return str_time_diff

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'
        col_list = cf.config['COL_LIST']

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

        x1 = ct.clsIndianRailway()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        # df_ret = p.read_json(ret_2, orient='records')

        df_ret = p.io.json.json_normalize(res)
        df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

        # Resetting the column orders as per JSON
        # df_ret = df_ret[list(res[0].keys())]
        column_order = col_list
        df_mod_ret = df_ret.reindex(column_order, axis=1)

        # Sorting the source data for better viewing
        df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

        l.logr('1.IndianRailway_' + var + '.csv', debug_ind, df_mod_resp, 'log')

        # Fetching Data for Delhi To Howrah
        df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

        l.logr('2.IndianRailway_Delhi2Howrah_' + var + '.csv', debug_ind, df_del_how, 'log')

        # Splitting Arrive time into two separate fields for better calculation
        df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
        df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
        df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

        l.logr('3.IndianRailway_Del2How_Mod_' + var + '.csv', debug_ind, df_del_how, 'log')

        # To fetch the best route which saves time
        lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
        min_lstTimeDayDiff = int(min(lstTimeDayDiff))

        df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

        l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

        # Now application will check the maximum arrivetimediff, this will bring the record
        # which arrives early at Howrah station
        lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
        max_lstTimeDiff = int(max(lstTimeDiff))

        df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

        # Dropping unwanted columns
        df_best_route.drop(columns=['arriveTimeM'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDayDiff'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDiff'], inplace=True)

        l.logr('5.IndianRailway_Del2How_BestRoute_' + var + '.csv', debug_ind, df_best_route, 'log')

        print("-" * 60)

        print('Realtime Indian Railway Data:: ')
        logging.info('Realtime Indian Railway Data:: ')
        print(df_mod_resp)
        print()
        print('Best Route from Delhi -> Howrah:: ')
        print(df_best_route)
        print()

        # Checking execution status
        ret_val_2 = df_best_route.shape[0]

        if ret_val_2 == 0:
            print("Indian Railway hasn't returned any rows. Please check your queries!")
            logging.info("Indian Railway hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet to explore –

# Query using parameters
rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

In this case, we make it interactive mode. However, in the actual scenario, you would receive these values from your mobile application.

x1 = ct.clsIndianRailway()
ret_2 = x1.searchQry(rawQry)

# Capturing the JSON Payload
res = json.loads(ret_2)

The above four lines initially invoke the API & receive the JSON response.

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

# Resetting the column orders as per JSON
column_order = col_list
df_mod_ret = df_ret.reindex(column_order, axis=1)

# Sorting the source data for better viewing
df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

In these last five lines, our application will convert the JSON & serialize it into pandas dataframe, which is sorted after that.

The result will look like this –

SerializeJson2PandasDF

This is exceptionally critical, as this will allow you to achieve your target. Without flattening the data, you won’t get to your goal.

# Fetching Data for Delhi To Howrah
df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

As the line suggested, our application will pick-up only those records between New Delhi & Howrah. Thus, we’ve used our filter to eliminate additional records. And, the data will look like this –

SilteredRecords

Now, we need to identify the minimum time taken by anyone of the two records. For that, we’ll be doing some calculations to fetch the minimum time taken by the application.

# Splitting Arrive time into two separate fields for better calculation
df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

To do that, we’ll be generating a couple of derived columns (shown above), which we’ll be using the fetch the shortest duration. And, the data should look like this –

CalculatedFields

These are the two fields, which we’re using for our calculation. First, we’re splitting arriveTime into two separate columns i.e. arriveTimeM & arriveTimeDayDiff. However, arriveTimeDiff is a calculated field.

So, our logic to find the best routes –

  • arriveTimeDayDiff = Take the minimum of the records. If you have multiple candidates, then we’ll pick all of them. In this case, we’ll get two records.
  • ArrivalDiff = (24:00 – <Train’s Arrival Time>), then take the maximum of the value

Note that, in this case, we haven’t considered the departure time. You can add that logic to improvise & correct your prediction.

The above steps can be seen in the following snippet –

# To fetch the best route which saves time
lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
min_lstTimeDayDiff = int(min(lstTimeDayDiff))

df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

# Now application will check the maximum arrivetimediff, this will bring the record
# which arrives early at Howrah station
lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
max_lstTimeDiff = int(max(lstTimeDiff))

df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

Let’s see how it runs –

Output

As you can see that NDLS (New Delhi), we’ve three records marked in the GREEN square box. However, as destination HWH (Howrah), we’ve only two records marked in the RED square box. However, as part of our calculation, we’ll pick the record marked with the BLUE square box.

Let’s see how the log directory generates all the files –

Log_Dir

Let’s see the final output in our csv file –

BestRoute

So, finally, we’ve achieved it. 😀

Let me know – how do you like this post. Please share your suggestion & comments.

I’ll be back with another installment from the Python verse.

Till then – Happy Avenging!

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Building GUI application using Python

Hi Guys!

Today, We’ll be exploring to create a GUI application using Python.

We’ll be using a briefcase package, one of the popular utilities from Python-verse.

The advantage of this package is you can create an application on Windows, MAC, Linux & Android using the same piece of code.

Let’s explore!

Step – 1:

We’ll be installing briefcase package –

1. Installing Packages - 1

Step – 2:

Install the toga package –

2. Installing Packages - 2.JPG

Step – 3:

Install the pycairo package –

3. Installing Packages - 3

Some O/S, you might not be able to install pycairo. In that case, you need to install it from a third-party site & need to install a wheel package.

4. Installing Packages - 4

Step – 4:

Finally, the last package –

5. Installing Packages - 5
  • For Windows, you need to install the Wix toolset.
  • For MAC, there is no additional tool you need to install.
  • For iOS, You need to have X-Code & a developer account.
  • For Linux, You need to install GTK 3.10 or later.
  • For Android, you need to install the Android studio.
  • For web-app, you need to use the Django framework.

Let’s create the virtual directory –

python -m venv –copies .env

.env\Scripts\activate.bat

Let’s create the default app –

There will be a series of inputs that you need to pass it to create the default application –

6. Creating First Step

Please find the RED highlighted options.

Let’s find the run commands for different environments  –

python setup.py windows -s

python setup.py macos -s

python setup.py linux -s

python setup.py ios -s

python setup.py android -s

Let’s run that in Windows –

7. Creating First App

Now, we’ll modify our code & we’ll add some text field & calculation logic in this Apps.

Let’s review the central directory structure –

11. Main Directory Structures

Now, explore the SDPythonApp directory & we’ll find the following structure –

10. Directory Structure

Let’s discuss our code –

1. app.py (This script will contain the main logic of GUI Apps & will invoke be the main application)

###############################################
#### Written By: SATYAKI DE                ####
#### Written On: 24-Nov-2019               ####
####                                       ####
#### Objective: This script will create a  ####
#### GUI application with sample function. ####
####                                       ####
###############################################

"""
First IOS App made from Python
"""
import toga
from toga.style import Pack
from toga.style.pack import COLUMN, ROW, LEFT, RIGHT


class Sdapp(toga.App):

    def two_digit_decimal(self, n1):
        n = round(n1, 2)
        return n

    def calculate(self, widget):
        try:
            # Dummy Tax Calculation
            val = (float(self.f_input.value) * 4.5) * 2.7 / 100.0
            self.c_input.value = self.two_digit_decimal(val)
        except Exception:
            self.c_input.value = 'Please provide numeric values!'

    def startup(self):
        # Create a main window with a name matching the app
        self.main_window = toga.MainWindow(title=self.name)

        # Create a main content box
        f_box = toga.Box()
        c_box = toga.Box()
        box = toga.Box()

        self.c_input = toga.TextInput(readonly=True)
        self.f_input = toga.TextInput()

        self.c_label = toga.Label('$', style=Pack(text_align=LEFT))
        self.f_label = toga.Label('Salary', style=Pack(text_align=LEFT))
        self.join_label = toga.Label('Tax Amount', style=Pack(text_align=RIGHT))

        # Button Activity
        button = toga.Button('Generate Tax', on_press=self.calculate)

        f_box.add(self.f_input)
        f_box.add(self.f_label)

        c_box.add(self.join_label)
        c_box.add(self.c_input)
        c_box.add(self.c_label)

        box.add(f_box)
        box.add(c_box)
        box.add(button)

        box.style.update(direction=COLUMN, padding_top=10)
        f_box.style.update(direction=ROW, padding=5)
        c_box.style.update(direction=ROW, padding=5)

        self.c_input.style.update(flex=1)
        self.f_input.style.update(flex=1, padding_left=160)
        self.c_label.style.update(width=100, padding_left=10)
        self.f_label.style.update(width=100, padding_left=10)
        self.join_label.style.update(width=150, padding_right=10)

        button.style.update(padding=15, flex=1)

        # Add the content on the main window
        self.main_window.content = box

        # Show the main window
        self.main_window.show()


def main():
    return Sdapp('SDApp', 'com.firstapp.SDPythonApp')

Let’s discuss the key lines –

self.c_label = toga.Label('

The following lines are textbox boilerplate in the main application.

# Button Activity
button = toga.Button('Generate Tax', on_press=self.calculate)

The above lines will trigger the event when someone clicks the application & it will trigger the function named calculate.

def calculate(self, widget):
    try:
        # Dummy Tax Calculation
        val = (float(self.f_input.value) * 4.5) * 2.7 / 100.0
        self.c_input.value = self.two_digit_decimal(val)
    except Exception:
        self.c_input.value = 'Please provide numeric values!'

In the above function, we’ve prepared a dummy calculation logic for TAX calculation. And, finally, we’ll be extracting two digits numeric digits after decimal by invoking the two_digit_decimal function.

def two_digit_decimal(self, n1):
    n = round(n1, 2)
    return n

This function will return two digits after decimal places.

In the above function, we’ve prepared a dummy calculation logic for TAX calculation. And, finally, we’ll be extracting two digits numeric digits after decimal by invoking the two_digit_decimal function.

Let’s run our application –

8.1. Before Advanced App

Let’s provide input say 1234 as shown in the above figures & click the Generate Tax button marked in RED. This will prompt the following screen.

8. Advanced App

Let’s explore if someone provides invalid input –

9. Exception Cases

As expected, this will throw a proper warning to its application user.

So, we’ve done it. Building our first python based GUI application across multiple platforms.

Please share your review.

So, we’ll come out with another new post in the coming days!

N.B.: This is demonstrated for RnD/study purposes. All the data posted here are representational data & available over the internet & for educational purpose only.

Publishing new Python Library for JSON & NoSQL

Hi Guys!

As discussed,

Please find the link of the PyPI package of new enhanced JSON library on Python. This is particularly very useful as I’ve accommodated the following features into it.

  1. distinct
  2. nvl
  3. partition_by
  4. regex_like
  5. regex_replace
  6. regex_substr

All these functions can be used over JSON payload through python. I’ll discuss this in details in my next blog post.

However, I would like to suggest this library that will be handy for NoSQL databases like Cosmos DB. Now, you can quickly implement many of these features such as distinct, partitioning & regular expressions with less effort.

Please find the library URL.

Let me know your feedback on the same.

N.B.: I’ve tested this library both on Windows 10 & Ubuntu 18. And, the python version that I’ve used are Python3.6 & Python3.7.

Till then!

Happy Avenging!

Combining the NoSQL(Cosmos DB) & traditional Azure RDBMS in Azure (Time stone solo from Python verse)

Hi Guys!

Today, our main objective is to extend our last post & blending two different kinds of data using Python.

Please refer the earlier post if you didn’t go through it – “Building Azure cosmos application.“.

What is the Objective?

In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.

Examining Source Data.

No SQL Data from Cosmos:

Let’s check one more time the No SQL data created in our last post.

CosmosData

Total, we’ve created 6 records in our last post.

As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.

Azure SQL DB:

Let’s create some data in Azure SQL DB.

But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.

I won’t discuss the detailed steps of creating DB here.

From Azure portal, it looks like –

Azure SQL DB Main Screen

Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.

Let’s create the table & some sample data aligned as per our cosmos data.

Azure SQL DB

We will join both the data based on subscriberId & then extract our required columns in our final output.

CombinedData

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py
  • clsColMgmt.py
  • clsCosmosDBDet.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC

Here is the detailed directory structure between the Windows & MAC O/S.

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Updated On: 02-Jun-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'SERVER': 'xxxx-xxx.database.windows.net',
        'DATABASE_1': 'SalesForceMaster',
        'DATABASE_2': 'hrMaster',
        'DATABASE_3': 'statMaster',
        'USERNAME': 'admin_poc_dev',
        'PASSWORD': 'xxxxx',
        'DRIVER': '{ODBC Driver 17 for SQL Server}',
        'ENV': 'pocdev-saty',
        'ENCRYPT_FLAG': "yes",
        'TRUST_FLAG': "no",
        'TIMEOUT_LIMIT': "30",
        'PROCSTAT': "'Y'",
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'CONFIG_TABLE': 'ETL_CONFIG_TAB',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'AZURE_SQL_1': "SELECT DISTINCT subscriberId, state, country, annualIncome, customerType FROM dbo.onboardCustomer",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

Here, we’ve added a couple of more entries compared to the last time, which points the detailed configuration for Azure SQL DB.

‘SERVER’: ‘xxxx-xxx.database.windows.net’,
‘DATABASE_1’: ‘SalesForceMaster’,
‘DATABASE_2’: ‘hrMaster’,
‘DATABASE_3’: ‘statMaster’,
‘USERNAME’: ‘admin_poc_dev’,
‘PASSWORD’: ‘xxxxx’,
‘DRIVER’: ‘{ODBC Driver 17 for SQL Server}’,
‘ENV’: ‘pocdev-saty’,
‘ENCRYPT_FLAG’: “yes”,
‘TRUST_FLAG’: “no”,
‘TIMEOUT_LIMIT’: “30”,
‘PROCSTAT’: “‘Y'”, 

Here, you need to supply your DB credentials accordingly.

2. clsDBLookup.py (This script will look into the Azure SQL DB & fetch data from the traditional RDBMS of Azure environment.)

#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 25-May-2019                     ####
####                                             ####
#### Objective: This script will check &         ####
#### test the connection with the Azure          ####
#### SQL DB & it will fetch all the records      ####
#### name resied under the same DB of a table.   ####
#####################################################

import pyodbc as py
import pandas as p
from clsConfig import clsConfig as cdc

class clsDBLookup(object):
    def __init__(self, lkpTableName = ''):
        self.server = cdc.config['SERVER']
        self.database = cdc.config['DATABASE_1']
        self.database1 = cdc.config['DATABASE_2']
        self.database2 = cdc.config['DATABASE_3']
        self.username = cdc.config['USERNAME']
        self.password = cdc.config['PASSWORD']
        self.driver = cdc.config['DRIVER']
        self.env = cdc.config['ENV']
        self.encrypt_flg = cdc.config['ENCRYPT_FLAG']
        self.trust_flg = cdc.config['TRUST_FLAG']
        self.timeout_limit = cdc.config['TIMEOUT_LIMIT']
        self.lkpTableName = cdc.config['CONFIG_TABLE']
        self.ProcStat = cdc.config['PROCSTAT']
        self.AppId = cdc.config['APP_ID']

    def LookUpData(self):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            db_con_azure = py.connect(str_conn)

            query = " SELECT [ruleId] as ruleId, [ruleName] as ruleName, [ruleSQL] as ruleSQL, " \
                    " [ruleFlag] as ruleFlag, [appId] as appId, [DBType] as DBType, " \
                    " [DBName] as DBName FROM [dbo][" + lkpTableName + "] WHERE ruleFLag = " + ProcStat + " " \
                    " and appId = " + AppId + " ORDER BY ruleId "

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

    def azure_sqldb_read(self, sql):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            # print("Connection Details:: ", str_conn)
            db_con_azure = py.connect(str_conn)

            query = sql

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

Major lines to discuss –

azure_sqldb_read(self, sql):

Getting the source SQL supplied from the configuration script.

db_con_azure = py.connect(str_conn)

query = sql

df = p.read_sql(query, db_con_azure)

After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.

3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Modified On 02-Jun-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsLog()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Accessing from Azure SQL DB
        x1 = dbcon.clsDBLookup()
        act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

        print("Azure SQL DB::")
        print(act_df)
        print()

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

        # Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
        df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

        df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

        print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
        print(df_fin)

        l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')

        # Transforming the orderDate as per standard format
        df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

        # Dropping the old column & renaming the new column to old column
        df_fin.drop(columns=['orderDate'], inplace=True)
        df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

        print("*" * 157)
        print()
        print("Final Combined & Transformed result:: ")
        print(df_fin)

        l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
        print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The key lines from this script –

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

This function converts NoSQL date data type more familiar format.

NoSQL Date:
NoSQL_Date
Transformed Date:
Transformed Date
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

print("Azure SQL DB::")
print(act_df)
print()

Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.

# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.

# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.

Let’s see the directory structure of our program –

Win_Vs_MAC

Let’s see how it looks when it runs –

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

So, finally, we’ve successfully blended the data & make more meaningful data projection.

Following python packages are required to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

pip install pyodbc

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]

The advanced concept of Pandas & Numpy with an aggregate & lookup of file logging (A crossover over of Space Stone & Soul Stone from the Python verse)

Today, we’ll be implementing the advanced concept of Pandas & Numpy & how one can aggregate data & produce meaningful data insights into your business, which makes an impact on your overall profit.

First, let us understand the complexity of the problem & what we’re looking to achieve here. For that, you need to view the source data & lookup data & how you want to process the data.

Source Data:

sourcedata-e1554702920904-1

The above picture is a sample data-set from a Bank (Data available on U.S public forum), which captures the information of the customer’s current account balance. Let’s look into the look-up files sample data –

First File:

LookUp_1_Actual

Second File:

LookUp_2So, one can clearly see, Bank is trying to get a number of stories based on the existing data.

Challenges:

The first lookup file contains data in a manner where the column of our source file is row here. Hence, you need to somehow bring the source data as per the lookup file to get the other relevant information & then joining that with the second lookup file to bring all the data point for your storyline.

Look-Up Configuration:

In order to match the look-up data with our source data, we’ll be adding two new columns, which will help the application to process the correct row out of the entries provided in the look-up file 1.

LookUp_1

As you can see from the above picture, that two new columns i.e. Category & Stat have added in this context. Here, the category contains metadata information. If a column has a significant number of unique values, then we’re marking it as ‘D in the category. In this case, the bank doesn’t offer any scheme based on the customer’s name. Hence, these fields are marked with ‘I. For the Gender column, the application has less number of unique records i.e. either ‘Male‘ or ‘Female‘. As a result, we provided two corresponding entries. Remember, DateJoined is a key column here. Even though we marked its category as ‘I‘, which denote no transformation requires – ‘K‘ will denote that it is the driving column apart from one of the surrogate key [PKEY] that we’ll be generating during our application transformation process. I’ll discuss that in the respective snippet discussion.

Our Goal:

Based on the source data, We need to find the following story & published that in an excel sheet separately.

  1. The country, Gender wise Bank’s contribution.
  2. The country, Job-wise Bank’s contribution.
  3. The country & Age range wise Saving trends & Bank’s contribution.

A little note on Bank’s Contribution:

Let us explain, what exactly means by Bank’s contribution. Sometimes, bank want’s to encourage savings to an individual client based on all the available factors. So, let’s assume that – Bank contribute $1 for every $150 saving of a person. Again this $1 may vary based on the Age Range & gender to promote a specific group. Also, when someone opens any savings account with the bank, by default bank contributed a sum of $100 at the time when they open an account for a short period of time as part of their promotion strategy. These details you will get it from first lookup file. Second lookup file contains the age range category base on the Group that is available in First Lookup file.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post

  • clsFindFile.py
  • clsL.py

So, I’m not going to discuss these scripts. 

1. clsParam.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as normal verbose debug logging as well. Hence, the name comes into the picture.) 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import os
import platform as pl

class clsParam(object):
    os_det = pl.system()
    dir_sep = ''

    if os_det == "Windows":
        dir_sep = "\\"
    else:
        dir_sep = '/'

    config = {
        'MAX_RETRY' : 5,
        'PATH' : os.path.dirname(os.path.realpath(__file__)) + dir_sep,
        'SRC_DIR' : os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'src_files' + dir_sep,
        'FIN_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'finished' + dir_sep,
        'LKP_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'lkp_files' + dir_sep,
        'LOG_DIR': os.path.dirname(os.path.realpath(__file__)) + dir_sep + 'log' + dir_sep,
        'LKP_FILE': 'DataLookUp',
        'LKP_CATG_FILE': 'CategoryLookUp',
        'LKP_FILE_DIR_NM': 'lkp_files',
        'SRC_FILE_DIR_NM': 'src_files',
        'FIN_FILE_DIR_NM': 'finished',
        'LOG_FILE_DIR_NM': 'log',
        'DEBUG_IND': 'Y'
    }

 

2. clsLookUpDataRead.py (This script will look into the lookup file & this will generate the combined lookup result as we’ve two different lookup files. Hence, the name comes into the picture.) 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import pandas as p
import clsFindFile as c
import clsL as log
from clsParam import clsParam as cf
import datetime

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsLookUpDataRead(object):

    def __init__(self, lkpFilename):
        self.lkpFilename = lkpFilename

        self.lkpCatgFilename = cf.config['LKP_CATG_FILE']
        self.path = cf.config['PATH']
        self.subdir = str(cf.config['LOG_FILE_DIR_NM'])

        # To disable logging info
        self.Ind = cf.config['DEBUG_IND']
        self.var = datetime.datetime.now().strftime(".%H.%M.%S")

    def getNaN2Null(self, row):
        try:
            str_val = ''
            str_val = str(row['Group']).replace('nan', '').replace('NaN','')

            return str_val
        except:
            str_val = ''

            return str_val

    def ReadTable(self):
        # Assigning Logging Info
        lkpF = []
        lkpF_2 = []
        var = self.var
        Ind = self.Ind
        subdir = self.subdir

        # Initiating Logging Instances
        clog = log.clsL()

        try:

            # Assinging Lookup file name
            lkpFilename = self.lkpFilename

            # Fetching the actual look-up file name
            f = c.clsFindFile(lkpFilename, str(cf.config['LKP_FILE_DIR_NM']))
            lkp_file_list = list(f.find_file())

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for i in range(len(lkp_file_list)):
                lkpF = lkp_file_list[i]

            # Fetching the content of the look-up file
            df_lkpF = p.read_csv(lkpF, index_col=False)

            # Fetching Category LookUp File
            LkpCatgFileName = self.lkpCatgFilename

            f1 = c.clsFindFile(LkpCatgFileName, str(cf.config['LKP_FILE_DIR_NM']))
            lkp_file_list_2 = list(f1.find_file())

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for j in range(len(lkp_file_list_2)):
                lkpF_2 = lkp_file_list_2[j]

            # Fetching the content of the look-up file
            df_lkpF_2 = p.read_csv(lkpF_2, index_col=False)

            # Changing both the column data type as same type
            df_lkpF['Group_1'] = df_lkpF['Group'].astype(str)
            df_lkpF_2['Group_1'] = df_lkpF_2['Group'].astype(str)

            # Dropping the old column
            df_lkpF.drop(['Group'], axis=1, inplace=True)
            df_lkpF_2.drop(['Group'], axis=1, inplace=True)

            # Renaming the changed data type column with the old column name
            df_lkpF.rename(columns={'Group_1':'Group'}, inplace=True)
            df_lkpF_2.rename(columns={'Group_1': 'Group'}, inplace=True)

            # Merging two lookup dataframes to form Final Consolidated Dataframe
            df_Lkp_Merge = p.merge(
                                    df_lkpF[['TableName', 'ColumnOrder', 'ColumnName', 'MappedColumnName',
                                             'Category', 'Stat', 'Group', 'BankContribution']],
                                    df_lkpF_2[['StartAgeRange', 'EndAgeRange', 'Group']],
                                    on=['Group'], how='left')

            # Converting NaN to Nul or empty string
            df_Lkp_Merge['GroupNew'] = df_Lkp_Merge.apply(lambda row: self.getNaN2Null(row), axis=1)

            # Dropping the old column & renaming the new column
            df_Lkp_Merge.drop(['Group'], axis=1, inplace=True)
            df_Lkp_Merge.rename(columns={'GroupNew': 'Group'}, inplace=True)

            clog.logr('1.df_Lkp_Merge' + var + '.csv', Ind, df_Lkp_Merge, subdir)

            return df_Lkp_Merge

        except(FileNotFoundError, IOError) as s:
            y = str(s)
            print(y)

            # Declaring Empty Dataframe
            df_error = p.DataFrame()

            return df_error
        except Exception as e:
            x = str(e)
            print(x)

            # Declaring Empty Dataframe
            df_error = p.DataFrame()

            return df_error

 

Key lines from this script –

# Fetching the actual look-up file name
f = c.clsFindFile(lkpFilename, str(cf.config['LKP_FILE_DIR_NM']))
lkp_file_list = list(f.find_file())

# Ideally look-up will be only one file
# Later it will be converted to table
for i in range(len(lkp_file_list)):
lkpF = lkp_file_list[i]

# Fetching the content of the look-up file
df_lkpF = p.read_csv(lkpF, index_col=False)

Here, the application will try to find out the lookup file based on the file name pattern & directory path. And, then load the data into the dataframe.

# Fetching Category LookUp File
LkpCatgFileName = self.lkpCatgFilename

f1 = c.clsFindFile(LkpCatgFileName, str(cf.config['LKP_FILE_DIR_NM']))
lkp_file_list_2 = list(f1.find_file())

# Ideally look-up will be only one file
# Later it will be converted to table
for j in range(len(lkp_file_list_2)):
lkpF_2 = lkp_file_list_2[j]

# Fetching the content of the look-up file
df_lkpF_2 = p.read_csv(lkpF_2, index_col=False)

In this step, the second lookup file will be loaded into the second dataframe.

# Changing both the column data type as same type
df_lkpF['Group_1'] = df_lkpF['Group'].astype(str)
df_lkpF_2['Group_1'] = df_lkpF_2['Group'].astype(str)

# Dropping the old column
df_lkpF.drop(['Group'], axis=1, inplace=True)
df_lkpF_2.drop(['Group'], axis=1, inplace=True)

# Renaming the changed data type column with the old column name
df_lkpF.rename(columns={'Group_1':'Group'}, inplace=True)
df_lkpF_2.rename(columns={'Group_1': 'Group'}, inplace=True)

It is always better to cast the same datatype for those columns, which will be used part of the joining key. The above snippet does exactly that.

# Merging two lookup dataframes to form Final Consolidated Dataframe
df_Lkp_Merge = p.merge(
df_lkpF[['TableName', 'ColumnOrder', 'ColumnName', 'MappedColumnName',
'Category', 'Stat', 'Group', 'BankContribution']],
df_lkpF_2[['StartAgeRange', 'EndAgeRange', 'Group']],
on=['Group'], how='left')

In this step, the first lookup file will be left join with the second lookup file based on Group column.

# Converting NaN to Nul or empty string
df_Lkp_Merge['GroupNew'] = df_Lkp_Merge.apply(lambda row: self.getNaN2Null(row), axis=1)

# Dropping the old column & renaming the new column
df_Lkp_Merge.drop(['Group'], axis=1, inplace=True)
df_Lkp_Merge.rename(columns={'GroupNew': 'Group'}, inplace=True)

Once merge is done, key columns need to suppress ‘NaN’ values to Null for better data process.

3. clsPivotLookUp.py (This script will actually contain the main logic to process & merge the data between source & lookup files & create group data & based on that data point will be produced & captured in the excel. Hence, the name comes into the picture.) 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 04-Apr-2019       ########
###########################################

import pandas as p
import numpy as np
import clsFindFile as c
import clsL as log
import datetime
from clsParam import clsParam as cf
from pandas import ExcelWriter

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsPivotLookUp(object):

    def __init__(self, srcFilename, tgtFileName, df_lkpF):
        self.srcFilename = srcFilename
        self.tgtFileName = tgtFileName
        self.df_lkpF = df_lkpF
        self.lkpCatgFilename = cf.config['LKP_CATG_FILE']

        self.path = cf.config['PATH']
        self.subdir = str(cf.config['LOG_FILE_DIR_NM'])
        self.subdir_2 = str(cf.config['FIN_FILE_DIR_NM'])
        # To disable logging info
        self.Ind = cf.config['DEBUG_IND']
        self.report_path = cf.config['FIN_DIR']

    def dfs_tabs(self, df_list, sheet_list, file_name):
        try:
            cnt = 0
            number_rows = 0

            writer = p.ExcelWriter(file_name, engine='xlsxwriter')

            for dataframe, sheet in zip(df_list, sheet_list):
                number_rows = int(dataframe.shape[0])
                number_cols = int(dataframe.shape[1])

                if cnt == 0:
                    dataframe.to_excel(writer, sheet_name=sheet, startrow=7, startcol=5)
                else:
                    dataframe.to_excel(writer, sheet_name=sheet, startrow=5, startcol=0)

                # Get the xlsxwriter workbook & worksheet objects
                workbook = writer.book
                worksheet = writer.sheets[sheet]
                worksheet.set_zoom(90)

                if cnt == 0:
                    worksheet.set_column('A:E', 4)
                    worksheet.set_column('F:F', 20)
                    worksheet.set_column('G:G', 10)
                    worksheet.set_column('H:J', 20)

                    # Insert an Image
                    worksheet.insert_image('E1', 'Logo.png', {'x_scale':0.6, 'y_scale':0.8})

                    # Add a number format for cells with money.
                    money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})
                    worksheet.set_column('H:H', 20, money_fmt)

                    # Define our range for color formatting
                    color_range = "F9:F{}".format(number_rows * 2 + 1)

                    # Add a format. Red fill with the dark red text
                    red_format = workbook.add_format({'bg_color':'#FEC7CE', 'font_color':'#0E0E08', 'border':1})

                    # Add a format. Green fill with the dark green text
                    green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08', 'border': 1})

                    # Add a format. Cyan fill with the dark green text
                    mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08', 'border': 1})

                    # Add a format. Other fill with the dark green text
                    oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08', 'border': 1})

                    worksheet.conditional_format(color_range, {'type':'cell',
                                                               'criteria':'equal to',
                                                               'value':'"England"',
                                                               'format': green_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Northern Ireland"',
                                                               'format': mid_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Scotland"',
                                                               'format': red_format})

                    worksheet.conditional_format(color_range, {'type': 'cell',
                                                               'criteria': 'equal to',
                                                               'value': '"Wales"',
                                                               'format': oth_format})
                else:
                    first_row = 5
                    first_col = 0
                    last_row = first_row + (number_rows * 2)
                    last_col = number_cols - 1

                    if cnt == 1:
                        worksheet.set_column('A:D', 20)
                    else:
                        worksheet.set_column('A:E', 20)
                        worksheet.set_column('F:F', 20)


                    # Add a number format for cells with money.
                    # money_fmt = workbook.add_format({'num_format': '$#,##0', 'bold': True, 'border':1})
                    money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})

                    # Amount columns
                    if cnt == 1:
                        worksheet.set_row(6, 0, money_fmt)
                        worksheet.set_column('C:C', 20, money_fmt)
                    else:
                        worksheet.set_row(6, 0, money_fmt)
                        worksheet.set_column('D:F', 20, money_fmt)

                    # Insert an Image
                    worksheet.insert_image('B1', 'Logo.png', {'x_scale': 0.5, 'y_scale': 0.5})

                    # Add a format. Red fill with the dark red text
                    red_format = workbook.add_format({'bg_color': '#FEC7CE', 'font_color': '#0E0E08'})

                    # Add a format. Green fill with the dark green text
                    green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08'})

                    # Add a format. Cyan fill with the dark green text
                    mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08'})

                    # Add a format. Other fill with the dark green text
                    oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08'})

                    # Fill colour based on formula
                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="England"',
                                                  'format': green_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Northern Ireland"',
                                                  'format': mid_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Scotland"',
                                                  'format': red_format})

                    worksheet.conditional_format(first_row,
                                                 first_col,
                                                 last_row,
                                                 last_col,
                                                 {'type': 'formula',
                                                  'criteria': '=INDIRECT("A"&ROW())="Wales"',
                                                  'format': oth_format})

                cnt += 1

            writer.save()
            writer.close()

            return 0
        except Exception as e:
            x = str(e)
            print(x)

            return 1

    def getIntVal(self, row):
        try:
            int_val = 0
            int_val = int(row['MCategory'])

            return int_val
        except:
            int_val = 0

            return int_val

    def getSavingsAmount(self, row):
        try:
            savings = 0.0
            savings = float(row['Balance']) - float(row['BankContribution'])

            return savings
        except:
            savings = 0

            return savings

    def getNaN2Zero_StartAgeRange(self, row):
        try:
            int_AgeRange = 0
            str_StartAgeRange = ''

            str_StartAgeRange = str(row['StartAgeRange']).replace('nan','').replace('NaN','')

            if (len(str_StartAgeRange) > 0):
                int_AgeRange = int(float(str_StartAgeRange))
            else:
                int_AgeRange = 0

            return int_AgeRange
        except:
            int_AgeRange = 0

            return int_AgeRange

    def getNaN2Zero_EndAgeRange(self, row):
        try:
            int_AgeRange = 0
            str_EndAgeRange = ''

            str_EndAgeRange = str(row['EndAgeRange']).replace('nan','').replace('NaN','')

            if (len(str_EndAgeRange) > 0):
                int_AgeRange = int(float(str_EndAgeRange))
            else:
                int_AgeRange = 0

            return int_AgeRange
        except:
            int_AgeRange = 0

            return int_AgeRange


    def parse_and_write_csv(self):

        # Assigning Logging Info
        Ind = self.Ind
        subdir = self.subdir
        subdir_2 = self.subdir_2
        lkpF = []
        lkpF_2 = []
        report_path = self.report_path

        #Initiating Logging Instances
        clog = log.clsL()

        if Ind == 'Y':
            print('Logging Enabled....')
        else:
            print('Logging Not Enabled....')

        # Assigning Source File Basic Name
        srcFileInit = self.srcFilename
        tgtFileName = self.tgtFileName
        df_lkpF = self.df_lkpF

        try:

            # Fetching the actual source file name
            d = c.clsFindFile(self.srcFilename, str(cf.config['SRC_FILE_DIR_NM']))
            src_file_list = d.find_file()

            # Ideally look-up will be only one file
            # Later it will be converted to table
            for i in range(len(src_file_list)):

                # Handling Multiple source files
                var = datetime.datetime.now().strftime(".%H.%M.%S")
                print('Target File Extension will contain the following:: ', var)

                srcF = src_file_list[i]

                # Reading Source File
                df = p.read_csv(srcF, index_col=False)

                # Adding a new surrogate key to the existing records
                df = df.assign(PKEY=[1 + i for i in range(len(df))])[['PKEY'] + df.columns.tolist()]

                clog.logr('2.DF_Assign' + var + '.csv', Ind, df, subdir)

                # Fetching only relevant rows from the Look-up Files
                # based on Filters with 'I' or No Token
                # 'K' for Key columns with No Token
                # 'D' for Single column Token
                df_lkpFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
                                     ((df_lkpF['Category'] == 'I') | (df_lkpF['Category'] == 'K'))]

                # Fetching the unique records from Look-up table
                id_list1 = list(df_lkpFile['ColumnName'].drop_duplicates())
                id_list2 = ['PKEY']

                id_list = id_list2 + id_list1

                # Pivoting part of the source file data to be join for merge
                df_melt = df.melt(id_vars=id_list, var_name='ColumnName')

                # Changing the generated column Value to Category for upcoming Merge
                # df_melt = df_tmp_melt.rename_by_col_index(idx_np,'Category')
                # df_melt.rename(columns={'value': 'Category'}, inplace=True)
                df_melt.rename(columns={'value': 'MCategory'}, inplace=True)

                #df_melt.to_csv(path+'1.DF_Melt.csv')
                clog.logr('3.DF_Melt' + var + '.csv', Ind, df_melt, subdir)

                # Now fetching look-up file one more time
                # filtering with the only Table Name
                # For merge with our temporary df_melt
                # to get the relevant lookup
                # information

                df_lkpFinFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
                                        ((df_lkpF['Category'] == 'D') | (df_lkpF['Category'] == 'Male') |
                                        (df_lkpF['Category'] == 'K') | (df_lkpF['Category'] == 'Female'))]

                clog.logr('4.DF_Finlkp' + var + '.csv', Ind, df_lkpFinFile, subdir)

                # Merging two files based on Keys
                # df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName', 'Category'], how='left')
                df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName'], how='left')

                clog.logr('5.DF_FIN_Basic_Merge' + var + '.csv', Ind, df_fin, subdir)

                df_fin2 = df_fin[((df_fin['MCategory'] == 'I') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 ((df_fin['MCategory'] == 'Male') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 ((df_fin['MCategory'] == 'Female') & (df_fin['Category'] == df_fin['MCategory'])) |
                                 (df_fin['MCategory'] == 'NaN') |
                                 (df_fin['MCategory'] == 'D') |
                                 (
                                     (df_fin['MCategory'] != 'I') & (df_fin['MCategory'] != 'Male') &
                                     (df_fin['MCategory'] != 'Female') & (df_fin['MCategory'] != 'D') &
                                     (df_fin['MCategory'] != 'NaN')
                                 )]

                clog.logr('6.Merge_After_Filter' + var + '.csv', Ind, df_fin2, subdir)

                # Identifying Integer Column for next step
                df_fin2['Catg'] = df_fin2.apply(lambda row: self.getIntVal(row), axis=1)
                df_fin2['StAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_StartAgeRange(row), axis=1)
                df_fin2['EnAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_EndAgeRange(row), axis=1)

                # Dropping the old Columns
                df_fin2.drop(['Category'], axis=1, inplace=True)
                df_fin2.drop(['StartAgeRange'], axis=1, inplace=True)
                df_fin2.drop(['EndAgeRange'], axis=1, inplace=True)

                # Renaming the new columns
                df_fin2.rename(columns={'Catg': 'Category'}, inplace=True)
                df_fin2.rename(columns={'StAge': 'StartAgeRange'}, inplace=True)
                df_fin2.rename(columns={'EnAge': 'EndAgeRange'}, inplace=True)

                clog.logr('7.Catg' + var + '.csv', Ind, df_fin2, subdir)

                # Handling special cases when Category from source & lookup file won't match
                # alternative way to implement left outer join due to specific data scenarios
                df_fin2['Flag'] = np.where(((df_fin2.StartAgeRange == 0) | (df_fin2.EndAgeRange == 0)) |
                                           (((df_fin2.StartAgeRange > 0) & (df_fin2.EndAgeRange > 0)) &
                                            ((df_fin2.Category >= df_fin2.StartAgeRange)
                                              & (df_fin2.Category <= df_fin2.EndAgeRange))), 'Y', 'N')

                clog.logr('8.After_Special_Filter' + var + '.csv', Ind, df_fin2, subdir)

                # Removing data where Flag is set to Y
                newDF = df_fin2[(df_fin2['Flag'] == 'Y')]

                clog.logr('9.Flag_Filter' + var + '.csv', Ind, newDF, subdir)

                # Need to drop column called ColumnName
                newDF.drop(['TableName'], axis=1, inplace=True)
                newDF.drop(['ColumnOrder'], axis=1, inplace=True)
                newDF.drop(['ColumnName'], axis=1, inplace=True)
                newDF.drop(['Category'], axis=1, inplace=True)
                newDF.drop(['Flag'], axis=1, inplace=True)
                newDF.drop(['Group'], axis=1, inplace=True)

                # Need to rename MappedColumnName to ColumnName
                newDF.rename(columns={'MappedColumnName': 'ColumnName'}, inplace=True)

                clog.logr('10.newDF' + var + '.csv', Ind, newDF, subdir)

                df_short = newDF[['PKEY', 'BankContribution', 'StartAgeRange', 'EndAgeRange']]

                clog.logr('11.df_short' + var + '.csv', Ind, df_short, subdir)

                # Aggregating information
                grouped = df_short.groupby(['PKEY'])
                dfGroup = grouped.aggregate(np.sum)

                clog.logr('12.dfGroup' + var + '.csv', Ind, dfGroup, subdir)

                # Let's merge to get evrything in row level
                df_rowlvl = df.merge(dfGroup, on=['PKEY'], how='inner')

                clog.logr('13.Rowlvl_Merge' + var + '.csv', Ind, df_rowlvl, subdir)

                # Dropping PKEY & Unnamed columns from the csv
                df_rowlvl.drop(['PKEY'], axis=1, inplace=True)

                clog.logr('14.Final_DF' + var + '.csv', Ind, df_rowlvl, subdir)

                ##############################################################
                #### Country & Gender wise Bank's Contribution           #####
                ##############################################################
                dfCountryGender = df_rowlvl[['Region', 'Gender', 'BankContribution']]

                grouped_CG = dfCountryGender.groupby(['Region', 'Gender'])
                dCountryGen = grouped_CG.aggregate(np.sum)

                print("-" * 60)
                print("Country & Gender wise Bank's Contribution")
                print("-" * 60)
                print(dCountryGen)

                clog.logr('15.dCountryGen' + var + '.csv', Ind, dCountryGen, subdir)

                ###############################################################
                ###### End Of Country & Gender wise Bank's Contribution  ######
                ###############################################################

                ##############################################################
                #### Country & Job wise Bank's Contribution              #####
                ##############################################################

                dfCountryJob = df_rowlvl[['Region', 'Job Classification', 'BankContribution']]

                grouped_CJ = dfCountryJob.groupby(['Region', 'Job Classification'])
                dCountryJob = grouped_CJ.aggregate(np.sum)

                print("-" * 60)
                print("Country & Job wise Bank's Contribution")
                print("-" * 60)
                print(dCountryJob)

                clog.logr('16.dCountryJob' + var + '.csv', Ind, dCountryJob, subdir)

                ###############################################################
                ###### End Of Country & Job wise Bank's Contribution     ######
                ###############################################################

                ##############################################################
                #### Country & Age wise Savings & Bank's Contribution    #####
                ##############################################################

                dfCountryAge = df_rowlvl[['Region', 'StartAgeRange', 'EndAgeRange', 'Balance', 'BankContribution']]
                dfCountryAge['SavingsAmount'] = dfCountryAge.apply(lambda row: self.getSavingsAmount(row), axis=1)

                grouped_CA = dfCountryAge.groupby(['Region', 'StartAgeRange', 'EndAgeRange'])
                dCountryAge = grouped_CA.aggregate(np.sum)

                print("-" * 60)
                print("Country & Job wise Bank's Contribution")
                print("-" * 60)
                print(dCountryAge)

                clog.logr('17.dCountryAge' + var + '.csv', Ind, dCountryAge, subdir)

                ##############################################################
                #### End Of Country & Age wise Savings & Bank's          #####
                #### Contribution                                        #####
                ##############################################################

                print('Writing to file!!')

                # Avoiding Index column of dataframe while copying to csv
                # df_token.to_csv(tgtFileName, index=False)
                # For Target File Ind should be always Yes/Y
                Ind = 'Y'

                FtgtFileName = tgtFileName + var + '.csv'
                clog.logr(FtgtFileName, Ind, df_rowlvl, subdir_2)

                ##############################################################
                ##### Writing to Excel File with Different Tabular Sheet #####
                ##############################################################
                dfs = [dCountryGen, dCountryJob, dCountryAge]
                sheets = ['Country-Gender-Stats', 'Country-Job-Stats', 'Country-Age-Stats']

                x = self.dfs_tabs(dfs, sheets, report_path+tgtFileName + var + '.xlsx')

                ##############################################################
                #####             End Of Excel Sheet Writing             #####
                ##############################################################

                # Resetting the Filename after every iteration
                # in case of Mulriple source file exists
                FtgtFileName = ""

            return 0

        except Exception as e:
            x = str(e)
            print(x)
            return 9

 

Key snippets from this script –

# Adding a new surrogate key to the existing records
df = df.assign(PKEY=[1 + i for i in range(len(df))])[['PKEY'] + df.columns.tolist()]

This is extremely crucial as the application will create its own unique key irrespective of data files, which will be used for most of the places for the data process.

df_lkpFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
((df_lkpF['Category'] == 'I') | (df_lkpF['Category'] == 'K'))]

# Fetching the unique records from Look-up table
id_list1 = list(df_lkpFile['ColumnName'].drop_duplicates())
id_list2 = ['PKEY']

id_list = id_list2 + id_list1

This steps will capture all the columns except our key columns in our source table, which will convert columns to rows & then it will be used to join with our look-up table.

# Pivoting part of the source file data to be join for merge
df_melt = df.melt(id_vars=id_list, var_name='ColumnName')

As in the above step, the application is converting key columns of our source file to rows.

df_lkpFinFile = df_lkpF[(df_lkpF['TableName'] == srcFileInit) &
((df_lkpF['Category'] == 'D') | (df_lkpF['Category'] == 'Male') |
(df_lkpF['Category'] == 'K') | (df_lkpF['Category'] == 'Female'))]

In this step, the application will consider all the rows based on source file name pattern & based on certain data, which will be used for lookup join.

df_fin = df_melt.merge(df_lkpFinFile, on=['ColumnName'], how='left')

In this step, the application will join the transformed data of source file with our lookup file.

df_fin2 = df_fin[((df_fin['MCategory'] == 'I') & (df_fin['Category'] == df_fin['MCategory'])) |
((df_fin['MCategory'] == 'Male') & (df_fin['Category'] == df_fin['MCategory'])) |
((df_fin['MCategory'] == 'Female') & (df_fin['Category'] == df_fin['MCategory'])) |
(df_fin['MCategory'] == 'NaN') |
(df_fin['MCategory'] == 'D') |
(
(df_fin['MCategory'] != 'I') & (df_fin['MCategory'] != 'Male') &
(df_fin['MCategory'] != 'Female') & (df_fin['MCategory'] != 'D') &
(df_fin['MCategory'] != 'NaN')
)]

This step brings the data, which will look like –

Imp_Step_1

# Identifying Integer Column for next step
df_fin2['Catg'] = df_fin2.apply(lambda row: self.getIntVal(row), axis=1)
df_fin2['StAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_StartAgeRange(row), axis=1)
df_fin2['EnAge'] = df_fin2.apply(lambda row: self.getNaN2Zero_EndAgeRange(row), axis=1)

# Dropping the old Columns
df_fin2.drop(['Category'], axis=1, inplace=True)
df_fin2.drop(['StartAgeRange'], axis=1, inplace=True)
df_fin2.drop(['EndAgeRange'], axis=1, inplace=True)

# Renaming the new columns
df_fin2.rename(columns={'Catg': 'Category'}, inplace=True)
df_fin2.rename(columns={'StAge': 'StartAgeRange'}, inplace=True)
df_fin2.rename(columns={'EnAge': 'EndAgeRange'}, inplace=True)

Now, the application will remove NaN from these key columns for important upcoming step.

After this step, the new data looks like –

Imp_Step_2

So, now, it will be easier to filter out these data based on age range against customer age int the next step as follows –

# Handling special cases when Category from source & lookup file won't match
# alternative way to implement left outer join due to specific data scenarios
df_fin2['Flag'] = np.where(((df_fin2.StartAgeRange == 0) | (df_fin2.EndAgeRange == 0)) |
(((df_fin2.StartAgeRange > 0) & (df_fin2.EndAgeRange > 0)) &
((df_fin2.Category >= df_fin2.StartAgeRange)
& (df_fin2.Category <= df_fin2.EndAgeRange))), 'Y', 'N')

After this, new data looks like –

Imp_Step_3

Finally, filter out only records with ‘Y’. And, the data looks like as follows –

Imp_Step_4

Now, the application needs to consolidate Bank Contribution, Start & End Age Range & needs to re-pivot the data to make it a single row per customer. The data should look like this –

Imp_Step_5

Once this is done, our application is ready for all the aggregated data points.

Hence, three different categories of data transformations are self-explanatory –

Data Point – 1:

##############################################################
#### Country & Gender wise Bank's Contribution #####
##############################################################
dfCountryGender = df_rowlvl[['Region', 'Gender', 'BankContribution']]

grouped_CG = dfCountryGender.groupby(['Region', 'Gender'])
dCountryGen = grouped_CG.aggregate(np.sum)

print("-" * 60)
print("Country & Gender wise Bank's Contribution")
print("-" * 60)
print(dCountryGen)

clog.logr('15.dCountryGen' + var + '.csv', Ind, dCountryGen, subdir)

###############################################################
###### End Of Country & Gender wise Bank's Contribution ######
###############################################################

Data Point – 2:

##############################################################
#### Country & Job wise Bank's Contribution #####
##############################################################

dfCountryJob = df_rowlvl[['Region', 'Job Classification', 'BankContribution']]

grouped_CJ = dfCountryJob.groupby(['Region', 'Job Classification'])
dCountryJob = grouped_CJ.aggregate(np.sum)

print("-" * 60)
print("Country & Job wise Bank's Contribution")
print("-" * 60)
print(dCountryJob)

clog.logr('16.dCountryJob' + var + '.csv', Ind, dCountryJob, subdir)

###############################################################
###### End Of Country & Job wise Bank's Contribution ######
###############################################################

Data Point – 3:

##############################################################
#### Country & Age wise Savings & Bank's Contribution #####
##############################################################

dfCountryAge = df_rowlvl[['Region', 'StartAgeRange', 'EndAgeRange', 'Balance', 'BankContribution']]
dfCountryAge['SavingsAmount'] = dfCountryAge.apply(lambda row: self.getSavingsAmount(row), axis=1)

grouped_CA = dfCountryAge.groupby(['Region', 'StartAgeRange', 'EndAgeRange'])
dCountryAge = grouped_CA.aggregate(np.sum)

print("-" * 60)
print("Country & Job wise Bank's Contribution")
print("-" * 60)
print(dCountryAge)

clog.logr('17.dCountryAge' + var + '.csv', Ind, dCountryAge, subdir)

##############################################################
#### End Of Country & Age wise Savings & Bank's #####
#### Contribution #####
##############################################################

Finally, these datasets will invoke an excel generator function to capture all these data into different sheets & beautify the report are as follows –

##############################################################
##### Writing to Excel File with Different Tabular Sheet #####
##############################################################
dfs = [dCountryGen, dCountryJob, dCountryAge]
sheets = ['Country-Gender-Stats', 'Country-Job-Stats', 'Country-Age-Stats']

x = self.dfs_tabs(dfs, sheets, report_path+tgtFileName + var + '.xlsx')

##############################################################
##### End Of Excel Sheet Writing #####
##############################################################

Key snippets from this function –

writer = p.ExcelWriter(file_name, engine='xlsxwriter')

This step will initiate the excel engine.

for dataframe, sheet in zip(df_list, sheet_list):
number_rows = int(dataframe.shape[0])
number_cols = int(dataframe.shape[1])

In this step, the application will unpack one by one sheet & produce the result into excel.

if cnt == 0:
dataframe.to_excel(writer, sheet_name=sheet, startrow=7, startcol=5)
else:
dataframe.to_excel(writer, sheet_name=sheet, startrow=5, startcol=0)

In this step, this will create the data starting from row 7 into the first sheet, whereas the remaining two sheets will capture data from row 5.

worksheet.set_column('A:E', 4)
worksheet.set_column('F:F', 20)
worksheet.set_column('G:G', 10)
worksheet.set_column('H:J', 20)

This will set the length of these columns.

# Insert an Image
worksheet.insert_image('E1', 'Logo.png', {'x_scale':0.6, 'y_scale':0.8})

In this case, the application will insert my blog logo on top of every page of this excel.

# Add a number format for cells with money.
money_fmt = workbook.add_format({'num_format': '$#,##0', 'border': 1})
worksheet.set_column('H:H', 20, money_fmt)

Also, for the column with monetary information, it will generate a specific format.

# Define our range for color formatting
color_range = "F9:F{}".format(number_rows * 2 + 1)

# Add a format. Red fill with the dark red text
red_format = workbook.add_format({'bg_color':'#FEC7CE', 'font_color':'#0E0E08', 'border':1})

# Add a format. Green fill with the dark green text
green_format = workbook.add_format({'bg_color': '#D0FCA4', 'font_color': '#0E0E08', 'border': 1})

# Add a format. Cyan fill with the dark green text
mid_format = workbook.add_format({'bg_color': '#6FC2D8', 'font_color': '#0E0E08', 'border': 1})

# Add a format. Other fill with the dark green text
oth_format = workbook.add_format({'bg_color': '#AFC2D8', 'font_color': '#0E0E08', 'border': 1})

worksheet.conditional_format(color_range, {'type':'cell',
'criteria':'equal to',
'value':'"England"',
'format': green_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Northern Ireland"',
'format': mid_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Scotland"',
'format': red_format})

worksheet.conditional_format(color_range, {'type': 'cell',
'criteria': 'equal to',
'value': '"Wales"',
'format': oth_format})

In this step, the application will color-code individual start cell to highlight specific category for better decision making visually.

4. callPivotLookUp.py (This script will call the main pivot script & process the data as per business requirement. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#####################################################
### Objective: Purpose of this Library is to call ###
### the parse_and_write_csv method to produce the ###
### tokenized columns based on the look-up file.  ###
###                                               ###
### Arguments are as follows:                     ###
### Source File, Target File & Lookup Files.      ###
###                                               ###
#####################################################

import clsPivotLookUp as ct
from clsParam import clsParam as cf
import sys
import pandas as p
import clsLookUpDataRead as cl

def main():
    print("Calling the custom Package..")

    cnt_lkp = 0

    try:
        #Default Look up table
        Lkp_Filename = cf.config['LKP_FILE']

        # Adding New DB Table for Lookup
        x = cl.clsLookUpDataRead(Lkp_Filename)
        df_lkpF = x.ReadTable()

        cnt_lkp = df_lkpF.shape[0]

        if cnt_lkp > 0:
            df_lkpF_copy = df_lkpF.copy()

            # Getting all the unique file names
            df_list_F1 = list(df_lkpF_copy['TableName'].drop_duplicates())

            # File list which has Tokenization
            df_lkpF_Int = df_lkpF[(df_lkpF['Group'].str.len() >= 1)]
            df_list_F2 = list(df_lkpF_Int['TableName'].drop_duplicates())

            for i in df_list_F1:
                if i in df_list_F2:
                    try:
                        inputFile = i

                        print("*"*30)
                        print("Reading from " + inputFile + ".csv")
                        print("*" * 30)

                        srcFileName = inputFile
                        tarFileName = srcFileName + '_processed'

                        x = ct.clsPivotLookUp(srcFileName, tarFileName, df_lkpF)

                        ret_val = x.parse_and_write_csv()

                        if ret_val == 0:
                            print("Writing to file -> (" + tarFileName + ".csv) Status: ", ret_val)
                        else:
                            if ret_val == 5:
                                print("File IO Error! Please check your directory whether the file exists with data!")
                            else:
                                print("Data Processing Issue!")

                        print("*" * 30)
                        print("Operation done for " + srcFileName + "!")
                        print("*" *30)
                    except Exception as e:
                        x = str(e)
                        srcFileName = inputFile
                        print('Check the status of ' + srcFileName + ' ' + x)
                else:
                    pass
        else:
            print("No Matching Data to process!")
    except Exception as e:
        x = str(e)
        print(x)

        print("No Matching Data to process!")

if __name__ == "__main__":
    main()

 

And, the key snippet from here –

# Getting all the unique file names
df_list_F1 = list(df_lkpF_copy['TableName'].drop_duplicates())

# File list which has Tokenization
df_lkpF_Int = df_lkpF[(df_lkpF['Group'].str.len() >= 1)]
df_list_F2 = list(df_lkpF_Int['TableName'].drop_duplicates())

This will identify all the source files, which as similar kind of cases & process them one by one.

x = ct.clsPivotLookUp(srcFileName, tarFileName, df_lkpF)
ret_val = x.parse_and_write_csv()

if ret_val == 0:
print("Writing to file -> (" + tarFileName + ".csv) Status: ", ret_val)
else:
if ret_val == 5:
print("File IO Error! Please check your directory whether the file exists with data!")
else:
print("Data Processing Issue!")

This will call the main application class & based on the return result – it will capture the status of success or failure.

Let’s check the directory of both the Windows & MAC.

Windows:

Win_Dir

MAC:

MAC_Dir

Let’s check the run process –

Windows:

Win_Run_1

Win_Run_2

MAC:

MAC_Run_1

MAC_Run_2

Let’s see – how it looks in Excel –

Windows:

Win_Sheet_1

Win_Sheet_2

Win_Sheet_3

MAC:

MAC_Sheet_1

MAC_Sheet_2

MAC_Sheet_3

So, finally, we’ve achieved our target. 

Horray! We’ve done it! 😀

I hope you’ll like this effort. 

Wait for the next installment. Till then, Happy Avenging. 🙂

[Note: All the sample data are available in public domain for research & study.]