Displaying real-time trade data in a dashboard using Python & third-party API & Streaming

Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.

The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –

  1. FinnHub: For more information, please click the following link.
  2. Ably: For more information, please click the following link.
  3. H2O-Wave: For more information, please click the following link.

I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –

creating-a-real-time-dashboard-from-streaming-data-using-python


In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Real-time trade dashboard

Let us explore the architecture of this implementation –

Architecture Diagram

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.


For this use case, you need to install the following packages –

STEP – 1:

Main Packages

STEP – 2:

Main Packages – Continue

STEP – 3:

Main Packages – Continue

STEP – 4:

Main Packages – End

You can copy the following commands to install the above-mentioned packages –

pip install ably 
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client

Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

FinnHub Portal

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.

1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )


###############################################################
#### Template Written By: H2O Wave ####
#### Enhanced with Streaming Data By: Satyaki De ####
#### Base Version Enhancement On: 20-Dec-2020 ####
#### Modified On 27-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
#### Note: This is an enhancement of my previous post of ####
#### H2O Wave. In this case, the application will consume ####
#### streaming trade data from a live host & not generated ####
#### out of the mock data. Thus, it is more useful for the ####
#### start-ups. ####
###############################################################
import time
from h2o_wave import site, data, ui
from ably import AblyRest
import pandas as p
import json
import datetime
import logging
import platform as pl
from clsConfig import clsConfig as cf
import clsL as cl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Global Area
## Global Class
# Initiating Log Class
l = cl.clsL()
# Global Variables
# Moving previous day log files to archive directory
log_dir = cf.config['LOG_PATH']
path = cf.config['INIT_PATH']
subdir = cf.config['SUBDIR']
## End Of Global Part
class DaSeries:
def __init__(self, inputDf):
self.Df = inputDf
self.count_row = inputDf.shape[0]
self.start_pos = 0
self.end_pos = 0
self.interval = 1
def next(self):
try:
# Getting Individual Element & convert them to Series
if ((self.start_pos + self.interval) <= self.count_row):
self.end_pos = self.start_pos + self.interval
else:
self.end_pos = self.start_pos + (self.count_row self.start_pos)
split_df = self.Df.iloc[self.start_pos:self.end_pos]
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)):
pass
else:
self.start_pos = self.start_pos + self.interval
x = float(split_df.iloc[0]['CurrentExchange'])
dx = float(split_df.iloc[0]['Change'])
# Emptying the exisitng dataframe
split_df = p.DataFrame(None)
return x, dx
except:
x = 0
dx = 0
return x, dx
class CategoricalSeries:
def __init__(self, sourceDf):
self.series = DaSeries(sourceDf)
self.i = 0
def next(self):
x, dx = self.series.next()
self.i += 1
return f'C{self.i}', x, dx
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split()
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split()
_color_index = 1
colors = dark_theme_colors
def next_color():
global _color_index
_color_index += 1
return colors[_color_index % len(colors)]
_curve_index = 1
curves = 'linear smooth step step-after step-before'.split()
def next_curve():
global _curve_index
_curve_index += 1
return curves[_curve_index % len(curves)]
def calc_p(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_p_val = float(row['p_y'])
else:
calc_p_val = float(row['p_x'])
return calc_p_val
except:
return 0.0
def calc_v(row):
try:
str_calc_s1 = str(row['s_x'])
str_calc_s2 = str(row['s_y'])
if str_calc_s1 == str_calc_s2:
calc_v_val = float(row['v_y'])
else:
calc_v_val = float(row['v_x'])
return calc_v_val
except:
return 0.0
def process_DF(inputDF, inputDFUnq):
try:
# Core Business logic
# The application will show default value to any
# trade-in stock in case that data doesn't consume
# from the source.
df_conv = inputDF
df_unique_fin = inputDFUnq
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
l.logr('3. max_df.csv', 'Y', df_conv, subdir)
# Sorting the output
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)
# New Column List Orders
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
df_fin = sorted_df.reindex(column_order, axis=1)
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)
# Now splitting the sorted df into two sets
lkp_max_count = 4
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)
# Now to perform cross join, we will create
# a key column in both the DataFrames to
# merge on that key.
df_unique_fin['key'] = 1
df_fin_req['key'] = 1
# Dropping unwanted columns
df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)
# Padding with dummy key values
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)
l.logr('8. merge_df.csv', 'Y', merge_df, subdir)
# Sorting the output
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)
# Calling new derived logic
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Dropping unwanted columns
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)
#Renaming the columns
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)
# Aligning columns
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
frames = [df_fin_na, merge_fin_df]
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])
l.logr('13. fin_df.csv', 'Y', fin_df, subdir)
# Final clearance & organization
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)
l.logr('14. Final.csv', 'Y', fin_df, subdir)
# Adjusting key columns
fin_df.rename(columns={'s':'Company'}, inplace=True)
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
fin_df.rename(columns={'v':'Change'}, inplace=True)
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)
return fin_df
except Exception as e:
print('$' * 120)
x = str(e)
print(x)
print('$' * 120)
df = p.DataFrame()
return df
def create_dashboard(update_freq=0.0):
page = site['/dashboard_finnhub']
general_log_path = str(cf.config['LOG_PATH'])
ably_id = str(cf.config['ABLY_ID'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO)
os_det = pl.system()
if os_det == "Windows":
src_path = path + '\\' + 'data\\'
else:
src_path = path + '/' + 'data/'
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
# Converting JSON to Dataframe
df = p.json_normalize(json_data)
df.columns = df.columns.map(lambda x: x.split(".")[1])
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
print('DF:')
print(df_conv)
# Writing to the file
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir)
# Dropping unwanted columns
df_conv.drop(columns=['c'], axis=1, inplace=True)
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1
lkp_rank = 1
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)]
# New Column List Orders
column_order = ['s', 'default_rank', 'p', 't', 'v']
df_unique_fin = df_unique.reindex(column_order, axis=1)
print('Rank DF Unique:')
print(df_unique_fin)
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir)
# Capturing transformed values into a DataFrame
# Depending on your logic, you'll implement that inside
# the process_DF functions
fin_df = process_DF(df_conv, df_unique_fin)
df_unq_fin = df_unique_fin.copy()
df_unq_fin.rename(columns={'s':'Company'}, inplace=True)
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True)
df_unq_fin.rename(columns={'v':'Change'}, inplace=True)
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True)
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir)
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True)
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir)
# Final clearance for better understanding of data
fin_df.drop(columns=['t'], axis=1, inplace=True)
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir)
count_row = df_unq_finale.shape[0]
large_lines = []
start_pos = 0
end_pos = 0
interval = 1
# Converting dataframe to a desired Series
f = CategoricalSeries(fin_df)
for j in range(count_row):
# Getting the series values from above
cat, val, pc = f.next()
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= count_row):
end_pos = start_pos + interval
else:
end_pos = start_pos + (count_row start_pos)
split_df = df_unq_finale.iloc[start_pos:end_pos]
if ((start_pos > count_row) | (start_pos == count_row)):
pass
else:
start_pos = start_pos + interval
x_currency = str(split_df.iloc[0]['Company'])
####################################################
##### Debug Purpose #########
####################################################
print('Company: ', x_currency)
print('J: ', str(j))
print('Cat: ', cat)
####################################################
##### End Of Debug #######
####################################################
c = page.add(f'e{j+1}', ui.tall_series_stat_card(
box=f'{j+1} 1 1 2',
title=x_currency,
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
data=dict(qux=val, quux=pc),
plot_type='area',
plot_category='foo',
plot_value='qux',
plot_color=next_color(),
plot_data=data('foo qux', 15),
plot_zero_value=0,
plot_curve=next_curve(),
))
large_lines.append((f, c))
page.save()
while update_freq > 0:
time.sleep(update_freq)
for f, c in large_lines:
cat, val, pc = f.next()
print('Update Cat: ', cat)
print('Update Val: ', val)
print('Update pc: ', pc)
print('*' * 160)
c.data.qux = val
c.data.quux = pc / 100
c.plot_data[1] = [cat, val]
page.save()
if __name__ == "__main__":
try:
# Main Calling script
create_dashboard(update_freq=0.25)
except Exception as e:
x = str(e)
print(x)

Let’s explore the key snippets from the above script –

def process_DF(inputDF, inputDFUnq):
    try:
        # Core Business logic
        # The application will show default value to any
        # trade-in stock in case that data doesn't consume
        # from the source.
        
        # Getting block count
        #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount()
        #l.logr('3. block_df.csv', 'Y', df_conv, subdir)

        # Getting block count
        #df_conv['max_count'] = df_conv.groupby(['default_rank']).size()
        #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count'])
        #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count')
        #df_conv_fin = df_conv.value_counts(['default_rank'])
        df_conv = inputDF
        df_unique_fin = inputDFUnq

        df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count')
        l.logr('3. max_df.csv', 'Y', df_conv, subdir)


        # Sorting the output
        sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True)

        # New Column List Orders
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        df_fin = sorted_df.reindex(column_order, axis=1)

        l.logr('4. sorted_df.csv', 'Y', df_fin, subdir)

        # Now splitting the sorted df into two sets
        lkp_max_count = 4
        df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)]

        l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir)

        df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)]
        l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir)

        # Now to perform cross join, we will create
        # a key column in both the DataFrames to
        # merge on that key.
        df_unique_fin['key'] = 1
        df_fin_req['key'] = 1

        # Dropping unwanted columns
        df_unique_fin.drop(columns=['t'], axis=1, inplace=True)
        l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir)

        # Padding with dummy key values
        #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left')
        merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1)

        l.logr('8. merge_df.csv', 'Y', merge_df, subdir)

        # Sorting the output
        sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True)

        l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir)

        # Calling new derived logic
        sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1)
        sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1)

        l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Dropping unwanted columns
        sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True)

        #Renaming the columns
        sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True)
        sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True)
        sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True)

        l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir)

        # Aligning columns
        column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v']
        merge_fin_df = sorted_merge_df.reindex(column_order, axis=1)

        l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir)

        # Finally, appending these two DataFrame (df_fin_na & merge_fin_df)
        frames = [df_fin_na, merge_fin_df]
        fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"])

        l.logr('13. fin_df.csv', 'Y', fin_df, subdir)

        # Final clearance & organization
        fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True)

        l.logr('14. Final.csv', 'Y', fin_df, subdir)

        # Adjusting key columns
        fin_df.rename(columns={'s':'Company'}, inplace=True)
        fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True)
        fin_df.rename(columns={'v':'Change'}, inplace=True)

        l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir)

        return fin_df
    except Exception as e:
        print('$' * 120)

        x = str(e)
        print(x)

        print('$' * 120)

        df = p.DataFrame()

        return df

The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –

a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1

Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.

def calc_p(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_p_val = float(row['p_y'])
        else:
            calc_p_val = float(row['p_x'])

        return calc_p_val
    except:
        return 0.0

def calc_v(row):
    try:
        str_calc_s1 = str(row['s_x'])
        str_calc_s2 = str(row['s_y'])

        if str_calc_s1 == str_calc_s2:
            calc_v_val = float(row['v_y'])
        else:
            calc_v_val = float(row['v_x'])

        return calc_v_val
    except:
        return 0.0

The above snippet will capture the default values for those missing records.

    client = AblyRest(ably_id)
    channel = client.channels.get('sd_channel')

    message_page = channel.history()

In the above snippet, the application will consume the streaming data from the Ably queue.

for i in message_page.items:
        print('Last Msg: {}'.format(i.data))
        json_data = json.loads(i.data)

        # Converting JSON to Dataframe
        df = p.json_normalize(json_data)
        df.columns = df.columns.map(lambda x: x.split(".")[-1])

        if cnt == 0:
            df_conv = df
        else:
            d_frames = [df_conv, df]
            df_conv = p.concat(d_frames)

        cnt += 1

The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.

    # Converting dataframe to a desired Series
    f = CategoricalSeries(fin_df)

    for j in range(count_row):
        # Getting the series values from above
        cat, val, pc = f.next()

        # Getting Individual Element & convert them to Series
        if ((start_pos + interval) <= count_row):
            end_pos = start_pos + interval
        else:
            end_pos = start_pos + (count_row - start_pos)

        split_df = df_unq_finale.iloc[start_pos:end_pos]

        if ((start_pos > count_row) | (start_pos == count_row)):
            pass
        else:
            start_pos = start_pos + interval

        x_currency = str(split_df.iloc[0]['Company'])

        ####################################################
        ##### Debug Purpose                        #########
        ####################################################
        print('Company: ', x_currency)
        print('J: ', str(j))
        print('Cat: ', cat)
        ####################################################
        #####   End Of Debug                         #######
        ####################################################

        c = page.add(f'e{j+1}', ui.tall_series_stat_card(
            box=f'{j+1} 1 1 2',
            title=x_currency,
            value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}',
            aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}',
            data=dict(qux=val, quux=pc),
            plot_type='area',
            plot_category='foo',
            plot_value='qux',
            plot_color=next_color(),
            plot_data=data('foo qux', -15),
            plot_zero_value=0,
            plot_curve=next_curve(),
        ))
        large_lines.append((f, c))

    page.save()

    while update_freq > 0:

        time.sleep(update_freq)

        for f, c in large_lines:
            cat, val, pc = f.next()

            print('Update Cat: ', cat)
            print('Update Val: ', val)
            print('Update pc: ', pc)
            print('*' * 160)

            c.data.qux = val
            c.data.quux = pc / 100
            c.plot_data[-1] = [cat, val]

        page.save()

The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.


2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jun-2021 ####
#### ####
#### Objective: This script will consume real-time ####
#### streaming data coming out from a hosted API ####
#### sources (Finnhub) using another popular third-party ####
#### service named Ably. Ably mimics pubsub Streaming ####
#### concept, which might be extremely useful for ####
#### any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
# generate random floating point values
from random import seed
from random import random
# seed random number generator
import websocket
import json
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.config['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
def on_message(ws, message):
print("*" * 60)
res = json.loads(message)
jsBody = res["data"]
jdata_dyn = json.dumps(jsBody)
print(jdata_dyn)
# JSON data
# This is the default data for all the identified category
# we've prepared. You can extract this dynamically. Or, By
# default you can set their base trade details.
json_data = [{
"c": "null",
"p": 0.01,
"s": "AAPL",
"t": 1624715406407,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "AMZN",
"t": 1624715406408,
"v": 0.01
},{
"c": "null",
"p": 0.01,
"s": "BINANCE:BTCUSDT",
"t": 1624715406409,
"v": 0.01
},
{
"c": "null",
"p": 0.01,
"s": "IC MARKETS:1",
"t": 1624715406410,
"v": 0.01
}]
jdata = json.dumps(json_data)
# Publish a message to the sd_channel channel
channel.publish('event', jdata)
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_dyn)
jsBody = []
jdata_dyn = ''
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
# Invoking Individual Company Trade Queries
ws.send('{"type":"subscribe","symbol":"AAPL"}')
ws.send('{"type":"subscribe","symbol":"AMZN"}')
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()

The key snippet from the above script –

    json_data = [{
        "c": "null",
        "p": 0.01,
        "s": "AAPL",
        "t": 1624715406407,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "AMZN",
        "t": 1624715406408,
        "v": 0.01
    },{
        "c": "null",
        "p": 0.01,
        "s": "BINANCE:BTCUSDT",
        "t": 1624715406409,
        "v": 0.01
    },
        {
        "c": "null",
        "p": 0.01,
        "s": "IC MARKETS:1",
        "t": 1624715406410,
        "v": 0.01
        }]

As we already discussed, we’ll pass a default set of data for all the candidate companies.

    # Publish a message to the sd_channel channel
    channel.publish('event', jdata)

    # Publish rest of the messages to the sd_channel channel
    channel.publish('event', jdata_dyn)

Publish the messages to the created channel.

def on_open(ws):
    # Invoking Individual Company Trade Queries
    ws.send('{"type":"subscribe","symbol":"AAPL"}')
    ws.send('{"type":"subscribe","symbol":"AMZN"}')
    ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}')
    ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}')

if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)

Send the company-specific trade queries through websocket apps to submit that to FinnHub.

3. clsConfig.py ( This file contains the configuration details. )


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning. Application will ####
#### process these information & perform ####
#### various analysis on Linear-Regression. ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
config = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'Data' + sep,
'APP_DESC_1': 'H2O Wave Integration with FinHubb!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e'
}

view raw

clsConfig.py

hosted with ❤ by GitHub


Let’s explore the directory structure –

MAC Directory

Let’s run the application –

Step 1:

Starting of Wave Server

Step 2:

Triggering message consumption job

Step 3:

Triggering the main application

You can monitor the message consumption from your Ably portal as follows –

Message Consumption

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

Message spike during consumption

And, the final output in the interactive dashboard will be look like the below screenshot –

Interactive Real-time Dashboard

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Managing mesh-APIs a few best practices including circuit-breaker & more.

Hi Guys!

Today, We will be discussing some of the better use-cases where we want to implement better solutions, which helps our entire Microservice/API integration painless.

As we drive towards more & more real-time applications such as mobile apps, IoT, and bots, we are engaging more microservice/API components in our technology stack. Moreover, We have witnessed that on many occasions, a full grown microservice environment leads to many unforeseen scenarios, which was not predicted or captured correctly.

To understand how a mesh-API architecture looks like, let us take one sample example for our case. Furthermore, what kind of problem we might face & how we can address.

Let us consider Airline online check-in as an example here.

There are following components are taking place irrespective of any airline ->

  • Filling-up primary information such as First Name, Last Name, Address, Contact details, Passport & Booking No.
  • We will have to choose a seat.
  • We will need to provide the input for a meal (However, this is optional as the domestic flight may or may not have this based on the distance & budget)
  • Finally, generations of Boarding-Pass.

If we decoupled our application here, we could create four inter-connected microservices to cater to our requirements. And this is shown in the given diagram –

Normal Microservice operations

From the above diagram, we can see that customer invokes a couple of related but independent services to finish the process of generating a Boarding-Pass. As per the diagram, initial requests reach the main Onboarding API & that results in cascading service calls & their successful response. In a developer’s term, this one we call it a “Happy Scenario!”

Nevertheless, developers do not always get this kind of rosy scenario. Sometimes, we encountered a scenario like this –

Cascading failure in microservice framework

Suppose we see an issue with our Boarding-pass generation service, which may encounter some network or any fatal issue, which leads to a chain of failure to its parent services. Moreover, the result will be disastrous.

How can we handle such scenarios? There are a few guidelines that we can explore.

  1. Time-Out trigger while calling any service/s
  2. Building Asynchronous service
  3. Implementation of Circuit-Breaker

We can also use the standard workflow management or custom build workflow manager to improve that further (We will discuss this later in this blog).

We understood that our parent service should not wait for a longer time, where there is a significant delay from the child service. Otherwise, it will have a cascading effect & bring down the entire array of services and it.

However, the use of Circuit-Breaker is an advanced & better way to handle such scenarios. To keep this post small, We are not going to share all the codebase here. We will examine mainly two base codes.

Please find the current directory structure –

Current directory structure

Let us consider implementing without the Circuit-Breaker. We have created the following functions & a custom Python library deployed for this demo. However, We will not discuss anything in this library today. We will release it in PyPi & share the post the update & detail functionality here later. This library is a Python-based lightweight workflow manager, which will continuously monitor any failed service & when connected – it will trigger it when connected.

Use of Python-based Workflow Manager

From the above diagram, we can see that if we force “time-out” while calling our services, we can use this lightweight framework later whenever that service wake-up or establish a connection.

Please find the WhatsApp delivery through our microservice framework using this lightweight workflow framework.

Sending WhatsApp message using Python-based Workflow Manager

We have used Visual studio code editor for this entire development of azure functions. And, the screen should look something like this –

Development Editor

We’ll discuss one the following two services –

  1. getSeatSelection
  2. getBoardingPass

We will also discuss the first script twice – one is with the standard time-out approach & another with the circuit-breaker.

  1. getSeatSelection ( This Microservice receives the inputs from its parent microservice & confirm the preferred seat-no selected by the passengers. )
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 05-Oct-2020              ####
#### Modified On 05-Oct-2020              ####
####                                      ####
#### Objective: Scripts to choose the     ####
#### seat for the passenger.              ####
##############################################

import logging
import json
import time
import os
import ssl
import requests

import azure.functions as func

def callService(inputJson, urlPost, retryNo, eventFlag, maxRetryNo, maxTimeOut):
    # Invoking getBoardingPass API
    try:
        # Bypassing SSL Authentication
        try:
            _create_unverified_https_context = ssl._create_unverified_context
        except AttributeError:
            # Legacy python that doesn't verify HTTPS certificates by default
            pass
        else:
            # Handle target enviornment that doesn't support HTTPS verification
            ssl._create_default_https_context = _create_unverified_https_context

        # Providing the url bearer token
        url = urlPost

        json_data = inputJson
        headers = {'content-type': "application/json", 'cache-control': "no-cache"}

        # Failed case retry
        retries = retryNo
        success = False

        return_description = 'N/A'
        statusVal = 'Failed'

        try:
            while not success:
                try:
                    # Getting response from web service
                    try:
                        strCheckMsg = 'Sending Payload - ' + str(retries)
                        logging.info(strCheckMsg)
                        
                        response = requests.request("POST", url, data=json_data, headers=headers, verify=False, timeout=maxTimeOut)

                        resp_dict = json.loads(response.text)

                        statusVal = resp_dict['status']
                        return_description = resp_dict['description']

                        logging.info(statusVal)

                    except Exception as e:
                        x = str(e)
                        logging.info(x)
                        success = False

                    if (eventFlag == 'Boarding-Pass'):
                        str_R = "Boarding-Pass Json Response:: " + str(response.text)
                    else:
                        str_R = "Invalid flag options "
                    
                    logging.info((str_R))

                    if len(response.text) > 80:
                        if str(response.status_code)[:1] == '2':
                            success = True
                        else:
                            wait = retries * 2
                            str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
                            logging.info(str_R1)
                            time.sleep(wait)
                            retries += 1

                        # Checking maximum retries
                        if retries >= maxRetryNo:
                            success = True
                            raise  Exception
                    else:
                        if str(response.status_code)[:1] == '2':
                            success = True
                        else:
                            # Checking maximum entries
                            if retries >= maxRetryNo:
                                success = True
                                raise  Exception

                            retries += 1
                except:
                    strVal = 'Retrying - ' + str(retries)
                    logging.info(strVal)
                    # Checking maximum entries
                    if retries >= maxRetryNo:
                        success = True
                        raise  Exception

                    retries += 1

            selection_flag = 'Y'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":statusVal,
                "description":return_description,
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except (ConnectionError, TimeoutError, InterruptedError) as e:
            str_R8 = "Response from Server: " + str(response)
            logging.info(str_R8)

            str_R9 = "Sending payload to Webservice!"
            logging.info(str_R9)
            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Lost",
                "description":"Timed-Out or Connection Error or any I/O issue preventing the process. We're working on this!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except ValueError as e:
            x = str(e)
            logging.info(x)

            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Failed",
                "description":"Please check the internal parameters compatibility with it's data-type!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except Exception as e:
            x = str(e)
            logging.info(x)

            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Accepted",
                "description":"Maximum rerties to the Boarding-Pass service is now reached. We're working offline to get that for you. Please rerty after 4 hours in case if you haven't received it or you can directly call customer care number!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval


    except Exception as e:
        x = str(e)
        logging.info(x)

        selection_flag = 'N'

        # Forming return JSON
        jsonRet = {
            "eventFlag": eventFlag,
            "status":"Failed",
            "description":"DevOps Engineer is looking into this. Please try after some time!",
            "computeFlag":selection_flag
        }

        xval = json.dumps(jsonRet)

        return xval

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Invoking Seat-Selection generation service.')

    # Getting System Variable from local settings file
    strVal = 'Parameter Recieved :' + os.environ['maxRetry']
    
    logging.info(strVal)

    # Capturing parameters from local settings
    max_retries = int(os.environ['maxRetry'])
    c_url = os.environ['cUrlBP']

    # Printing input Payload
    str_val = 'Input Payload:: ' + str(req.get_json())
    logging.info(str_val)

    strMainCheck = str(req.get_json())

    # variable
    x_status = 'Success'

    if (strMainCheck != ''):

        # Capturing individual elements

        sourceLeg = req.params.get('sourceLeg')
        destinationLeg = req.params.get('destinationLeg')
        boardingClass = req.params.get('boardingClass')
        preferredSeatNo = req.params.get('preferredSeatNo')
        travelPassport = req.params.get('travelPassport')
        bookingNo = req.params.get('bookingNo')
        travelerEmail = req.params.get('travelerEmail')
        travelerMobile = req.params.get('travelerMobile')

        # Checking Individual Elements

        if not sourceLeg:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                sourceLeg = req_body.get('sourceLeg')

        if not destinationLeg:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                destinationLeg = req_body.get('destinationLeg')

        if not boardingClass:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                boardingClass = req_body.get('boardingClass')

        if not preferredSeatNo:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                preferredSeatNo = req_body.get('preferredSeatNo')

        if not travelPassport:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelPassport = req_body.get('travelPassport')
        
        if not bookingNo:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                bookingNo = req_body.get('bookingNo')

        if not travelerEmail:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelerEmail = req_body.get('travelerEmail')

        if not travelerMobile:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelerMobile = req_body.get('travelerMobile')


        if (
            (sourceLeg != '') & (destinationLeg != '') & (boardingClass != '') & 
            (preferredSeatNo != '') & (travelPassport != '') & (bookingNo != '') &
            ((travelerEmail != '') | (travelerMobile != ''))
            ):

            x_description = "Seat-Selection Successfully Processed!"

            # Preparing Payload for boarding-pass
            json_str_boarding = {
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
                        }

            jsonBoardingInput = json.dumps(json_str_boarding)

            eventFlag = 'Boarding-Pass'
            retryNoBase = 1
            maxTimeOut = 2.5

            # Invoking getBoardingPass API
            microCallsBP = callService(jsonBoardingInput, c_url, retryNoBase, eventFlag, max_retries, maxTimeOut)
            
            # Extracting seat_selection_flag
            resp_dict = json.loads(microCallsBP)
            x_status = resp_dict['status']
            x_description = resp_dict['description']

            # Capturing description value based on the logic
            logging.info(x_description)

            # Formatting return Payload
            json_str = {
                "description": x_description,
                "status": x_status,
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
                        }

            xval = json.dumps(json_str)

            return func.HttpResponse(xval, status_code=200)
        else:
            json_str = {
                "description": "Missing mandatory Email or Phone Number!",
                "status": "Failed",
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
            }

            xval = json.dumps(json_str)
            return func.HttpResponse(xval,status_code=200)
    else:
        json_str = {
            "description": "Missing entire payload!",
            "status": "Failed",
            "sourceLeg": "N/A",
            "destinationLeg": "N/A",
            "boardingClass": "N/A",
            "confirmedSeatNo": "N/A",
            "travelPassport": "N/A",
            "bookingNo": "N/A",
            "travelerEmail": "N/A",
            "travelerMobile": "N/A"
        }

        xval = json.dumps(json_str)
        return func.HttpResponse(xval,status_code=200)

There are a few key snippets that we would like to discuss here.

# Getting response from web service
try:
    strCheckMsg = 'Sending Payload - ' + str(retries)
    logging.info(strCheckMsg)
    
    response = requests.request("POST", url, data=json_data, headers=headers, verify=False, timeout=maxTimeOut)

    resp_dict = json.loads(response.text)

    statusVal = resp_dict['status']
    return_description = resp_dict['description']

    logging.info(statusVal)

except Exception as e:
    x = str(e)
    logging.info(x)
    success = False

From the above snippet, we can see that we are using time-out based on our understanding of response time or the max SLA agreed upon for that particular service.

If we get the response, we are capturing the status as well as the description from its child service & capturing it in our log, which will look something like this –

Azure Monitor of Seat-Selection service

2. getBoardingPass ( This Microservice receives the input from its parent service & confirm the autogenerated boarding pass shared with their WhatsApp number.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 02-Oct-2020              ####
#### Modified On 04-Oct-2020              ####
####                                      ####
#### Objective: Scripts to generate the   ####
#### boarding pass for the passenger.     ####
##############################################

import logging
import json
import hashlib
import time
import os
import random

from twilio.rest import Client
from twilio.base.exceptions import TwilioRestException

import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    try:
        logging.info('Invoking Boarding-Pass generation service.')

        # Getting System Variable from local settings file
        strVal = 'Parameter Recieved :' + os.environ['timeOut']
        
        logging.info(strVal)
        sleep_time = int(os.environ['timeOut'])

        # Printing input Payload
        str_val = 'Input Payload:: ' + str(req.get_json())
        logging.info(str_val)

        strMainCheck = str(req.get_json())

        if (strMainCheck != ''):

            # Capturing individual elements

            sourceLeg = req.params.get('sourceLeg')
            destinationLeg = req.params.get('destinationLeg')
            boardingClass = req.params.get('boardingClass')
            confirmedSeatNo = req.params.get('confirmedSeatNo')
            travelPassport = req.params.get('travelPassport')
            bookingNo = req.params.get('bookingNo')
            travelerEmail = req.params.get('travelerEmail')
            travelerMobile = req.params.get('travelerMobile')

            # Checking Individual Elements

            if not sourceLeg:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    sourceLeg = req_body.get('sourceLeg')

            if not destinationLeg:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    destinationLeg = req_body.get('destinationLeg')

            if not boardingClass:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    boardingClass = req_body.get('boardingClass')

            if not confirmedSeatNo:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    confirmedSeatNo = req_body.get('confirmedSeatNo')

            if not travelPassport:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    travelPassport = req_body.get('travelPassport')
            
            if not bookingNo:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    bookingNo = req_body.get('bookingNo')

            if not travelerEmail:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    travelerEmail = req_body.get('travelerEmail')

            if not travelerMobile:
                try:
                    req_body = req.get_json()
                except ValueError:
                    pass
                else:
                    travelerMobile = req_body.get('travelerMobile')

            # Mimicking network latency or issues
            random_sleep_time = random.randint(1, sleep_time)
            time.sleep(random_sleep_time)

            child_cond_time = float(os.environ['timeOutChild'])

            # Important Details capture in log
            strRandSleepTime = 'Network Generated delay: ' + str(random_sleep_time) + ' sec'
            logging.info(strRandSleepTime)

            strThreadTimeOut = 'Orphan microservice kill time: ' + str(child_cond_time) + ' sec'
            logging.info(strThreadTimeOut)

            # Handling orphan microservice
            if (random_sleep_time > child_cond_time):
                strCondChk = 'Rasing exception to avoid orphan microservice call!'
                logging.info(strCondChk)
                raise Exception

            if (
                (sourceLeg != '') & (destinationLeg != '') & (boardingClass != '') & 
                (confirmedSeatNo != '') & (travelPassport != '') & (bookingNo != '') &
                ((travelerEmail != '') | (travelerMobile != ''))
                ):

                # Generating simulated barcode
                # Concatenating Key Values
                strImpVal = str(sourceLeg)+str(destinationLeg)+str(boardingClass)+str(confirmedSeatNo)+str(travelPassport)+str(bookingNo)+str(travelerEmail)+str(travelerMobile)

                # Printing Concatenated Value
                logging.info(strImpVal)

                # m.update(strImpVal)
                xbarcode = hashlib.md5(strImpVal.encode("utf").strip()).hexdigest().upper()

                # Formatting return Payload
                json_str = {
                    "description": "Boarding-Pass Successfully Generated!",
                    "status": "Success",
                    "sourceLeg": sourceLeg,
                    "destinationLeg": destinationLeg,
                    "boardingClass": boardingClass,
                    "confirmedSeatNo": confirmedSeatNo,
                    "travelPassport": travelPassport,
                    "bookingNo": bookingNo,
                    "travelerEmail": travelerEmail,
                    "travelerMobile": travelerMobile,
                    "barCode": xbarcode
                            }

                xval = json.dumps(json_str)

                # Calling Twilio API to send WhatsApp Message
                try:
                    strComAPI = 'Calling WhatsApp API'
                    logging.info(strComAPI)

                    account_sid = str(os.environ['accountSid']) 
                    auth_token = os.environ['authToken'] 
                    mobPr = os.environ['mobPredicate']

                    fromMob = mobPr + os.environ['fromMobile'] 
                    toMob = mobPr + travelerMobile

                    client = Client(account_sid, auth_token)

                    msgBody = 'Your Boarding-Pass from ' + sourceLeg + ' to ' + destinationLeg + ' are as follows --> Boarding Class (' + boardingClass + ') - ' + 'Seat (' + confirmedSeatNo + ') - Gate (A' + str(random_sleep_time) + ') - ' + 'Bar Code (' +  xbarcode + ')'

                    message = client.messages.create(from_=fromMob, body=msgBody, to=toMob)

                    msgStat = message.status

                    if (msgStat.upper() == 'QUEUED') | (msgStat.upper() == 'DELIEVERD') | (msgStat.upper() == 'SUCCESS'):
                        return func.HttpResponse(xval, status_code=200)
                    else:
                        # Formatting return Payload
                        json_str = {
                            "description": "Encountered some technical issue. Don't worry - we're working on it!",
                            "status": "Accepted",
                            "sourceLeg": sourceLeg,
                            "destinationLeg": destinationLeg,
                            "boardingClass": boardingClass,
                            "confirmedSeatNo": confirmedSeatNo,
                            "travelPassport": travelPassport,
                            "bookingNo": bookingNo,
                            "travelerEmail": travelerEmail,
                            "travelerMobile": travelerMobile,
                            "barCode": xbarcode
                                    }

                        xval = json.dumps(json_str)

                        return func.HttpResponse(xval, status_code=200)

                except TwilioRestException as e:
                    x = str(e)
                    logging.info(x)

                    # Formatting return Payload
                    json_str = {
                        "description": "Encountered some technical issue. Don't worry - we're working on it!",
                        "status": "Delivery Failed!",
                        "sourceLeg": sourceLeg,
                        "destinationLeg": destinationLeg,
                        "boardingClass": boardingClass,
                        "confirmedSeatNo": confirmedSeatNo,
                        "travelPassport": travelPassport,
                        "bookingNo": bookingNo,
                        "travelerEmail": travelerEmail,
                        "travelerMobile": travelerMobile,
                        "barCode": xbarcode
                                }

                    xval = json.dumps(json_str)

                    return func.HttpResponse(xval, status_code=200)
                
            else:
                xbarcode = 'Not Generated!'
                json_str = {
                    "description": "Missing mandatory Email or Phone Number!",
                    "status": "Failed",
                    "sourceLeg": sourceLeg,
                    "destinationLeg": destinationLeg,
                    "boardingClass": boardingClass,
                    "confirmedSeatNo": confirmedSeatNo,
                    "travelPassport": travelPassport,
                    "bookingNo": bookingNo,
                    "travelerEmail": travelerEmail,
                    "travelerMobile": travelerMobile,
                    "barCode": xbarcode
                }

                xval = json.dumps(json_str)
                return func.HttpResponse(xval,status_code=200)
        else:
            xbarcode = 'Not Generated!'
            json_str = {
                "description": "Missing entire payload!",
                "status": "Failed",
                "sourceLeg": "N/A",
                "destinationLeg": "N/A",
                "boardingClass": "N/A",
                "confirmedSeatNo": "N/A",
                "travelPassport": "N/A",
                "bookingNo": "N/A",
                "travelerEmail": "N/A",
                "travelerMobile": "N/A",
                "barCode": xbarcode
            }

            xval = json.dumps(json_str)
            return func.HttpResponse(xval,status_code=200)
    except Exception as e:
        x = str(e)
        logging.info(x)

        x_description = 'Time-out due to network delay!'
        logging.info(x_description)

        xbarcode = 'Not Generated!'
        json_str = {
            "description": x_description,
            "status": "Failed",
            "sourceLeg": "N/A",
            "destinationLeg": "N/A",
            "boardingClass": "N/A",
            "confirmedSeatNo": "N/A",
            "travelPassport": "N/A",
            "bookingNo": "N/A",
            "travelerEmail": "N/A",
            "travelerMobile": "N/A",
            "barCode": xbarcode
        }

        xval = json.dumps(json_str)
        return func.HttpResponse(xval,status_code=200)




Let us explore some of the vital snippets –

# Mimicking network latency or issues
random_sleep_time = random.randint(1, sleep_time)
time.sleep(random_sleep_time)

child_cond_time = float(os.environ['timeOutChild'])

# Important Details capture in log
strRandSleepTime = 'Network Generated delay: ' + str(random_sleep_time) + ' sec'
logging.info(strRandSleepTime)

strThreadTimeOut = 'Orphan microservice kill time: ' + str(child_cond_time) + ' sec'
logging.info(strThreadTimeOut)

# Handling orphan microservice
if (random_sleep_time > child_cond_time):
    strCondChk = 'Rasing exception to avoid orphan microservice call!'
    logging.info(strCondChk)
    raise Exception

We tried to create a random wait time, which will produce a sudden network latency or mimicking potential network/time-out/connection issues. Also, we want to finish this thread to avoid any unwanted WhatsApp message trigger via Twilio-API.

And, when we want to test the parent Seat-Selection service through postman, it will show the following response –

Seat-Selection Microservice response

We must have noticed that since the boarding pass also captured the seat details; hence, we are addressing them together here.

For successful execution, we can see a similar message –

Seat-Selection service success status

However, if we want to handle this better, we can use “Circuit-Breaker.”

Let us understand what this is. Since our childhood, we all know that when there is an electricity fluctuation happens frequently, to prevent all the electrical appliances, we deploy this small device that will disconnect the power between our house’s primary grid with all the appliances at our home. It will prevent these devices from getting exposed.

The same concept has been implemented for Microservice architecture as well.

For Normal operation without any issue

The above diagram shows that using a circuit breaker will continue all its previous parent services in everyday activities without any issues.

Basic concept of the Circuit Breaker in Microservice

The above diagram shows that it will stop all the parent services if the child service is not responding based on the threshold count.

Circuit breaker with partially allow case

The above diagram shows that once there is a “STOP” event captured. It will wait for a few seconds before it will allow a few services to test whether the impacted service is ready to consume further requests or not. This way, it can control the threading bottleneck.

The next diagram will give us the distinct state of a “Circuit Breaker.”

States of a Circuit Breaker

The above diagram shows that using a circuit breaker will prevent all its previous parent services. It will maintain a threshold of failures & successful data points. It will allow services to invoke the impacted service either entirely, partially, or even completely block it depending on the situation.

Let us revise the code & review it together –

3. getSeatSelection ( This Microservice receives the inputs from its parent microservice & confirm the preferred seat-no selected by the passengers – using Circuit-Breaker. )

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 05-Oct-2020              ####
#### Modified On 05-Oct-2020              ####
####                                      ####
#### Objective: Scripts to choose the     ####
#### seat for the passenger.              ####
##############################################

import logging
import json
import time
import os
import ssl
import requests

import azure.functions as func

from circuitbreaker import CircuitBreaker

class BoardingPassCircuitBreaker(CircuitBreaker):
    FAILURE_THRESHOLD = 10
    RECOVERY_TIMEOUT = 60
    EXPECTED_EXCEPTION = TimeoutError

@BoardingPassCircuitBreaker()
def callService(inputJson, urlPost, retryNo, eventFlag, maxRetryNo, maxTimeOut):
    # Invoking getBoardingPass API
    try:
        # Bypassing SSL Authentication
        try:
            _create_unverified_https_context = ssl._create_unverified_context
        except AttributeError:
            # Legacy python that doesn't verify HTTPS certificates by default
            pass
        else:
            # Handle target enviornment that doesn't support HTTPS verification
            ssl._create_default_https_context = _create_unverified_https_context

        # Providing the url bearer token
        url = urlPost

        json_data = inputJson
        headers = {'content-type': "application/json", 'cache-control': "no-cache"}

        # Failed case retry
        retries = retryNo
        success = False

        return_description = 'N/A'
        statusVal = 'Failed'

        try:
            while not success:
                try:
                    # Getting response from web service
                    try:
                        strCheckMsg = 'Sending Payload - ' + str(retries)
                        logging.info(strCheckMsg)
                        
                        response = requests.request("POST", url, data=json_data, headers=headers, verify=False, timeout=maxTimeOut)

                        resp_dict = json.loads(response.text)

                        statusVal = resp_dict['status']
                        return_description = resp_dict['description']

                        logging.info(statusVal)

                    except Exception as e:
                        x = str(e)
                        logging.info(x)
                        success = False

                    if (eventFlag == 'Boarding-Pass'):
                        str_R = "Boarding-Pass Json Response:: " + str(response.text)
                    else:
                        str_R = "Invalid flag options "
                    
                    logging.info((str_R))

                    if len(response.text) > 80:
                        if str(response.status_code)[:1] == '2':
                            success = True
                        else:
                            wait = retries * 2
                            str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"
                            logging.info(str_R1)
                            time.sleep(wait)
                            retries += 1

                        # Checking maximum retries
                        if retries >= maxRetryNo:
                            success = True
                            raise  Exception
                    else:
                        if str(response.status_code)[:1] == '2':
                            success = True
                        else:
                            # Checking maximum entries
                            if retries >= maxRetryNo:
                                success = True
                                raise  Exception

                            retries += 1
                except:
                    strVal = 'Retrying - ' + str(retries)
                    logging.info(strVal)
                    # Checking maximum entries
                    if retries >= maxRetryNo:
                        success = True
                        raise  Exception

                    retries += 1

            selection_flag = 'Y'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":statusVal,
                "description":return_description,
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except (ConnectionError, TimeoutError, InterruptedError) as e:
            str_R8 = "Response from Server: " + str(response)
            logging.info(str_R8)

            str_R9 = "Sending payload to Webservice!"
            logging.info(str_R9)
            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Lost",
                "description":"Timed-Out or Connection Error or any I/O issue preventing the process. We're working on this!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except ValueError as e:
            x = str(e)
            logging.info(x)

            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Failed",
                "description":"Please check the internal parameters compatibility with it's data-type!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval

        except Exception as e:
            x = str(e)
            logging.info(x)

            selection_flag = 'N'

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":"Accepted",
                "description":"Maximum rerties to the Boarding-Pass service is now reached. We're working offline to get that for you. Please rerty after 4 hours in case if you haven't received it or you can directly call customer care number!",
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval


    except Exception as e:
        x = str(e)
        logging.info(x)

        selection_flag = 'N'

        # Forming return JSON
        jsonRet = {
            "eventFlag": eventFlag,
            "status":"Failed",
            "description":"DevOps Engineer is looking into this. Please try after some time!",
            "computeFlag":selection_flag
        }

        xval = json.dumps(jsonRet)

        return xval

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Invoking Seat-Selection generation service.')

    # Getting System Variable from local settings file
    strVal = 'Parameter Recieved :' + os.environ['maxRetry']
    
    logging.info(strVal)

    # Capturing parameters from local settings
    max_retries = int(os.environ['maxRetry'])
    c_url = os.environ['cUrlBP']

    # Printing input Payload
    str_val = 'Input Payload:: ' + str(req.get_json())
    logging.info(str_val)

    strMainCheck = str(req.get_json())

    # variable
    x_status = 'Success'

    if (strMainCheck != ''):

        # Capturing individual elements

        sourceLeg = req.params.get('sourceLeg')
        destinationLeg = req.params.get('destinationLeg')
        boardingClass = req.params.get('boardingClass')
        preferredSeatNo = req.params.get('preferredSeatNo')
        travelPassport = req.params.get('travelPassport')
        bookingNo = req.params.get('bookingNo')
        travelerEmail = req.params.get('travelerEmail')
        travelerMobile = req.params.get('travelerMobile')

        # Checking Individual Elements

        if not sourceLeg:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                sourceLeg = req_body.get('sourceLeg')

        if not destinationLeg:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                destinationLeg = req_body.get('destinationLeg')

        if not boardingClass:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                boardingClass = req_body.get('boardingClass')

        if not preferredSeatNo:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                preferredSeatNo = req_body.get('preferredSeatNo')

        if not travelPassport:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelPassport = req_body.get('travelPassport')
        
        if not bookingNo:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                bookingNo = req_body.get('bookingNo')

        if not travelerEmail:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelerEmail = req_body.get('travelerEmail')

        if not travelerMobile:
            try:
                req_body = req.get_json()
            except ValueError:
                pass
            else:
                travelerMobile = req_body.get('travelerMobile')


        if (
            (sourceLeg != '') & (destinationLeg != '') & (boardingClass != '') & 
            (preferredSeatNo != '') & (travelPassport != '') & (bookingNo != '') &
            ((travelerEmail != '') | (travelerMobile != ''))
            ):

            x_description = "Seat-Selection Successfully Processed!"

            # Preparing Payload for boarding-pass
            json_str_boarding = {
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
                        }

            jsonBoardingInput = json.dumps(json_str_boarding)

            eventFlag = 'Boarding-Pass'
            retryNoBase = 1
            maxTimeOut = 2.5

            # Invoking getBoardingPass API
            microCallsBP = callService(jsonBoardingInput, c_url, retryNoBase, eventFlag, max_retries, maxTimeOut)
            
            # Extracting seat_selection_flag
            resp_dict = json.loads(microCallsBP)
            x_status = resp_dict['status']
            x_description = resp_dict['description']

            # Capturing description value based on the logic
            logging.info(x_description)

            # Formatting return Payload
            json_str = {
                "description": x_description,
                "status": x_status,
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
                        }

            xval = json.dumps(json_str)

            return func.HttpResponse(xval, status_code=200)
        else:
            json_str = {
                "description": "Missing mandatory Email or Phone Number!",
                "status": "Failed",
                "sourceLeg": sourceLeg,
                "destinationLeg": destinationLeg,
                "boardingClass": boardingClass,
                "confirmedSeatNo": preferredSeatNo,
                "travelPassport": travelPassport,
                "bookingNo": bookingNo,
                "travelerEmail": travelerEmail,
                "travelerMobile": travelerMobile
            }

            xval = json.dumps(json_str)
            return func.HttpResponse(xval,status_code=200)
    else:
        json_str = {
            "description": "Missing entire payload!",
            "status": "Failed",
            "sourceLeg": "N/A",
            "destinationLeg": "N/A",
            "boardingClass": "N/A",
            "confirmedSeatNo": "N/A",
            "travelPassport": "N/A",
            "bookingNo": "N/A",
            "travelerEmail": "N/A",
            "travelerMobile": "N/A"
        }

        xval = json.dumps(json_str)
        return func.HttpResponse(xval,status_code=200)

Let us review the key snippets –

from circuitbreaker import CircuitBreaker

class BoardingPassCircuitBreaker(CircuitBreaker):
    FAILURE_THRESHOLD = 10
    RECOVERY_TIMEOUT = 60
    EXPECTED_EXCEPTION = TimeoutError

@BoardingPassCircuitBreaker()
def callService(inputJson, urlPost, retryNo, eventFlag, maxRetryNo, maxTimeOut):
    # Invoking getBoardingPass API
    try:
	    <Code>

            # Forming return JSON
            jsonRet = {
                "eventFlag": eventFlag,
                "status":statusVal,
                "description":return_description,
                "computeFlag":selection_flag
            }

            xval = json.dumps(jsonRet)

            return xval


    except Exception as e:
        x = str(e)
        logging.info(x)

        selection_flag = 'N'

        # Forming return JSON
        jsonRet = {
            "eventFlag": eventFlag,
            "status":"Failed",
            "description":"DevOps Engineer is looking into this. Please try after some time!",
            "computeFlag":selection_flag
        }

        xval = json.dumps(jsonRet)

        return xval

We have put the microservice call into a function & then mark it as a circuit-breaker method. Also, we have enforced some custom settings in our main circuit-breaker class as well.

Moreover, here are some of our main API sample outputs, i.e., “getOnBoarding,” depending on the child-services’ availability.

Please click the above image gallery to see all the combinations of responses.

Finally, here is the WhatsApp message if that call successful. We have used Twilio API to exchange the text.

Boarding Pass delivery to Passenger WhatsApp No

Please find the dependent package from requirements.txt ->

azure-functions==1.4.0
certifi==2020.6.20
chardet==3.0.4
circuitbreaker==1.3.1
idna==2.10
numpy==1.19.2
pandas==1.1.2
PyJWT==1.7.1
python-dateutil==2.8.1
pytz==2020.1
requests==2.24.0
six==1.15.0
twilio==6.45.4
urllib3==1.25.10

So, finally, we have done it.

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

Predicting health issues for Senior Citizens based on “Realtime Weather Data” in Python

Hi Guys,

Today, I’ll be presenting a different kind of post here. I’ll be trying to predict health issues for senior citizens based on “realtime weather data” by blending open-source population data using some mock risk factor calculation. At the end of the post, I’ll be plotting these numbers into some graphs for better understanding.

Let’s drive!

For this first, we need realtime weather data. To do that, we need to subscribe to the data from OpenWeather API. For that, you have to register as a developer & you’ll receive a similar email from them once they have approved –

1. Subscription To Open Weather

So, from the above picture, you can see that, you’ll be provided one API key & also offered a couple of useful API documentation. I would recommend exploring all the links before you try to use it.

You can also view your API key once you logged into their console. You can also create multiple API keys & the screen should look something like this –

2. Viewing Keys For security reasons, I’ll be hiding my own keys & the same should be applicable for you as well.

I would say many of these free APIs might have some issues. So, I would recommend you to start testing the open API through postman before you jump into the Python development. Here is the glimpse of my test through the postman –

3. Testing API

Once, I can see that the API is returning the result. I can work on it.

Apart from that, one needs to understand that these API might have limited use & also you need to know the consequences in terms of price & tier in case if you exceeded the limit. Here is the detail for this API –

5. Package Details - API

For our demo, I’ll be using the Free tire only.

Let’s look into our other source data. We got the top 10 city population-wise over there internet. Also, we have collected sample Senior Citizen percentage against sex ratio across those cities. We have masked these values on top of that as this is just for education purposes.

1. CityDetails.csv

Here is the glimpse of this file –

4. Source File

So, this file only contains the total population across the top 10 cities in the USA.

2. SeniorCitizen.csv

6. SeniorCitizen Data

This file contains the Sex ratio of Senior citizens across those top 10 cities by population.

Again, we are not going to discuss any script, which we’ve already discussed here.

Hence, we’re skipping clsL.py here.

1. clsConfig.py (This script contains all the parameters of the server.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "http://api.openweathermap.org/data/2.5/weather",
        'API_HOST': "api.openweathermap.org",
        'API_KEY': "XXXXXXXXXXXXXXXXXXXXXX",
        'API_TYPE': "application/json",
        'CACHE': "no-cache",
        'CON': "keep-alive",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'SRC_PATH': Curr_Path + sep + 'Src_File' + sep,
        'APP_DESC_1': 'Open Weather Forecast',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SRC_FILE': Curr_Path + sep + 'Src_File' + sep + 'CityDetails.csv',
        'SRC_FILE_1': Curr_Path + sep + 'Src_File' + sep + 'SeniorCitizen.csv',
        'SRC_FILE_INIT': 'CityDetails.csv',
        'COL_LIST': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'name', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'weather', 'deg', 'gust', 'speed'],
        'COL_LIST_1': ['base', 'all', 'cod', 'lat', 'lon', 'dt', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'CityName', 'country', 'sunrise', 'sunset', 'type', 'timezone', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription'],
        'COL_LIST_2': ['CityName', 'Population', 'State']
    }

2. clsWeather.py (This script contains the main logic to extract the realtime data from our subscribed weather API.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsWeather:
    def __init__(self):
        self.url = cf.config['URL']
        self.openmapapi_host = cf.config['API_HOST']
        self.openmapapi_key = cf.config['API_KEY']
        self.openmapapi_cache = cf.config['CACHE']
        self.openmapapi_con = cf.config['CON']
        self.type = cf.config['API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            openmapapi_host = self.openmapapi_host
            openmapapi_key = self.openmapapi_key
            openmapapi_cache = self.openmapapi_cache
            openmapapi_con = self.openmapapi_con
            type = self.type

            querystring = {"appid": openmapapi_key, "q": rawQry}

            print('Input JSON: ', str(querystring))

            headers = {
                'host': openmapapi_host,
                'content-type': type,
                'Cache-Control': openmapapi_cache,
                'Connection': openmapapi_con
            }

            response = requests.request("GET", url, headers=headers, params=querystring)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

The key lines from this script –

querystring = {"appid": openmapapi_key, "q": rawQry}

print('Input JSON: ', str(querystring))

headers = {
    'host': openmapapi_host,
    'content-type': type,
    'Cache-Control': openmapapi_cache,
    'Connection': openmapapi_con
}

response = requests.request("GET", url, headers=headers, params=querystring)

ResJson  = response.text

In the above snippet, our application first preparing the payload & the parameters received from our param script. And then invoke the GET method to extract the real-time data in the form of JSON & finally sending the JSON payload to the primary calling function.

3. clsMap.py (This script contains the main logic to prepare the MAP using seaborn package & try to plot our custom made risk factor by blending the realtime data with our statistical data received over the internet.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### plot into the Map.                   ####
##############################################

import seaborn as sns
import logging
from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl

# This library requires later
# to print the chart
import matplotlib.pyplot as plt

class clsMap:
    def __init__(self):
        self.src_file =  cf.config['SRC_FILE_1']

    def calculateRisk(self, row):
        try:
            # Let's assume some logic
            # 1. By default, 30% of Senior Citizen
            # prone to health Issue for each City
            # 2. Male Senior Citizen is 19% more prone
            # to illness than female.
            # 3. If humidity more than 70% or less
            # than 40% are 22% main cause of illness
            # 4. If feels like more than 280 or
            # less than 260 degree are 17% more prone
            # to illness.
            # Finally, this will be calculated per 1K
            # people around 10 blocks

            str_sex = str(row['Sex'])

            int_humidity = int(row['humidity'])
            int_feelsLike = int(row['feels_like'])
            int_population = int(str(row['Population']).replace(',',''))
            float_srcitizen = float(row['SeniorCitizen'])

            confidance_score = 0.0

            SeniorCitizenPopulation = (int_population * float_srcitizen)

            if str_sex == 'Male':
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.19) + confidance_score
            else:
                confidance_score = (SeniorCitizenPopulation * 0.30 * 0.11) + confidance_score

            if ((int_humidity > 70) | (int_humidity < 40)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.22

            if ((int_feelsLike > 280) | (int_feelsLike < 260)):
                confidance_score = confidance_score + (int_population * 0.30 * float_srcitizen) * 0.17

            final_score = round(round(confidance_score, 2) / (1000 * 10), 2)

            return final_score

        except Exception as e:
            x = str(e)

            return x

    def setMap(self, dfInput):
        try:
            resVal = 0
            df = p.DataFrame()
            debug_ind = 'Y'
            src_file =  self.src_file

            # Initiating Log Class
            l = cl.clsL()

            df = dfInput

            # Creating a subset of desired columns
            dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

            l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

            # Fetching Senior Citizen Data
            df = p.read_csv(src_file, index_col=False)

            # Merging two frames
            dfMerge = p.merge(df, dfMod, on=['CityName'])

            l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

            # Getting RiskFactor quotient from our custom made logic
            dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

            l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

            # Generating Map plotss
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')
            # sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})
            sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

            # This is required when you are running
            # through normal Python & not through
            # Jupyter Notebook
            plt.show()

            return resVal

        except Exception as e:
            x = str(e)
            print(x)

            logging.info(x)
            resVal = x

            return resVal

Key lines from the above codebase –

# Creating a subset of desired columns
dfMod = df[['CityName', 'temp', 'Population', 'humidity', 'feels_like']]

l.logr('5.dfSuppliment.csv', debug_ind, dfMod, 'log')

# Fetching Senior Citizen Data
df = p.read_csv(src_file, index_col=False)

# Merging two frames
dfMerge = p.merge(df, dfMod, on=['CityName'])

l.logr('6.dfMerge.csv', debug_ind, dfMerge, 'log')

# Getting RiskFactor quotient from our custom made logic
dfMerge['RiskFactor'] = dfMerge.apply(lambda row: self.calculateRisk(row), axis=1)

l.logr('7.dfRiskFactor.csv', debug_ind, dfMerge, 'log')

Combining our Senior Citizen data with already processed data coming from our primary calling script. Also, here the application is calculating our custom logic to find out the risk factor figures. If you want to go through that, I’ve provided the logic to derive it. However, this is just a demo to find out similar figures. You should not rely on the logic that I’ve used (It is kind of my observation of life till now. :D).

The below lines are only required when you are running seaborn, not via Jupyter notebook.

plt.show()

4. callOpenMapWeatherAPI.py (This is the first calling script. This script also calls the realtime API & then blend the first file with it & pass the only relevant columns of data to our Map script to produce the graph.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 19-Jan-2020              ####
#### Modified On 19-Jan-2020              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsWeather as ct
import re
import numpy as np
import clsMap as cm

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getMainWeather(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_main_weather = str(df_lkp.iloc[0]['main'])

        return str_main_weather

    except Exception as e:
        x = str(e)
        str_main_weather = x

        return str_main_weather

def getMainDescription(row):
    try:
        # Using regular expression to fetch time part only

        lkp_Columns = str(row['weather'])
        jpayload = str(lkp_Columns).replace("'", '"')

        #jpayload = json.dumps(lkp_Columns)
        payload = json.loads(jpayload)

        df_lkp = p.io.json.json_normalize(payload)
        df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

        str_description = str(df_lkp.iloc[0]['description'])

        return str_description

    except Exception as e:
        x = str(e)
        str_description = x

        return str_description

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']
        col_list = cf.config['COL_LIST']
        col_list_1 = cf.config['COL_LIST_1']
        col_list_2 = cf.config['COL_LIST_2']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        df2 = p.DataFrame()

        src_file =  cf.config['SRC_FILE']

        # Fetching data from source file
        df = p.read_csv(src_file, index_col=False)

        # Creating a list of City Name from the source file
        city_list = df['CityName'].tolist()

        # Declaring an empty dictionary
        merge_dict = {}
        merge_dict['city'] = df2

        start_pos = 1
        src_file_name = '1.' + cf.config['SRC_FILE_INIT']

        for i in city_list:
            x1 = ct.clsWeather()
            ret_2 = x1.searchQry(i)

            # Capturing the JSON Payload
            res = json.loads(ret_2)

            # Converting dictionary to Pandas Dataframe
            # df_ret = p.read_json(ret_2, orient='records')

            df_ret = p.io.json.json_normalize(res)
            df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

            # Removing any duplicate columns
            df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

            # l.logr(str(start_pos) + '.1.' + src_file_name, debug_ind, df_ret, 'log')
            start_pos = start_pos + 1

            # If all the conversion successful
            # you won't get any gust column
            # from OpenMap response. Hence, we
            # need to add dummy reason column
            # to maintain the consistent structures

            if 'gust' not in df_ret.columns:
                df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

            # Resetting the column orders as per JSON
            column_order = col_list
            df_mod_ret = df_ret.reindex(column_order, axis=1)

            if start_pos == 1:
                merge_dict['city'] = df_mod_ret
            else:
                d_frames = [merge_dict['city'], df_mod_ret]
                merge_dict['city'] = p.concat(d_frames)

            start_pos += 1

        for k, v in merge_dict.items():
            l.logr(src_file_name, debug_ind, merge_dict[k], 'log')

        # Now opening the temporary file
        temp_log_file = log_dir + src_file_name

        dfNew = p.read_csv(temp_log_file, index_col=False)

        # Extracting Complex columns
        dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
        dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

        l.logr('2.dfNew.csv', debug_ind, dfNew, 'log')

        # Removing unwanted columns & Renaming key columns
        dfNew.drop(['weather'], axis=1, inplace=True)
        dfNew.rename(columns={'name': 'CityName'}, inplace=True)

        l.logr('3.dfNewMod.csv', debug_ind, dfNew, 'log')

        # Now joining with the main csv
        # to get the complete picture
        dfMain = p.merge(df, dfNew, on=['CityName'])

        l.logr('4.dfMain.csv', debug_ind, dfMain, 'log')

        # Let's extract only relevant columns
        dfSuppliment = dfMain[['CityName', 'Population', 'State', 'country', 'feels_like', 'humidity', 'pressure', 'temp', 'temp_max', 'temp_min', 'visibility', 'deg', 'gust', 'speed', 'WeatherMain', 'WeatherDescription']]

        l.logr('5.dfSuppliment.csv', debug_ind, dfSuppliment, 'log')

        # Let's pass this to our map section
        x2 = cm.clsMap()
        ret_3 = x2.setMap(dfSuppliment)

        if ret_3 == 0:
            print('Successful Map Generated!')
        else:
            print('Please check the log for further issue!')

        print("-" * 60)
        print()

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError as e:
        print(str(e))
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet from the above script –

# Capturing the JSON Payload
res = json.loads(ret_2)

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

Once the application received the JSON response from the realtime API, the application is converting it to pandas dataframe.

# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]

Since this is a complex JSON response. The application might encounter duplicate columns, which might cause a problem later. Hence, our app is removing all these duplicate columns as they are not required for our cases.

if 'gust' not in df_ret.columns:
    df_ret = df_ret.assign(gust=999999)[['gust'] + df_ret.columns.tolist()]

There is a possibility that the application might not receive all the desired attributes from the realtime API. Hence, the above lines will check & add a dummy column named gust for those records in case if they are not present in the JSON response.

if start_pos == 1:
    merge_dict['city'] = df_mod_ret
else:
    d_frames = [merge_dict['city'], df_mod_ret]
    merge_dict['city'] = p.concat(d_frames)

These few lines required as our API has a limitation of responding with only one city at a time. Hence, in this case, we’re retrieving one town at a time & finally merge them into a single dataframe before creating a temporary source file for the next step.

At this moment our data should look like this –

16. Intermediate_Data_1

Let’s check the weather column. We need to extract the main & description for our dashboard, which will be coming in the next installment.

# Extracting Complex columns
dfNew['WeatherMain'] = dfNew.apply(lambda row: getMainWeather(row), axis=1)
dfNew['WeatherDescription'] = dfNew.apply(lambda row: getMainDescription(row), axis=1)

Hence, we’ve used the following two functions to extract these values & the critical snippet from one of the service is as follows –

lkp_Columns = str(row['weather'])
jpayload = str(lkp_Columns).replace("'", '"')
payload = json.loads(jpayload)

df_lkp = p.io.json.json_normalize(payload)
df_lkp.columns = df_lkp.columns.map(lambda x: x.split(".")[-1])

str_main_weather = str(df_lkp.iloc[0]['main'])

The above lines extracting the weather column & replacing the single quotes with the double quotes before the application is trying to convert that to JSON. Once it converted to JSON, the json_normalize will easily serialize it & create individual columns out of it. Once you have them captured inside the pandas dataframe, you can extract the unique values & store them & return them to your primary calling function.

# Let's pass this to our map section
x2 = cm.clsMap()
ret_3 = x2.setMap(dfSuppliment)

if ret_3 == 0:
    print('Successful Map Generated!')
else:
    print('Please check the log for further issue!')

In the above lines, the application will invoke the Map class to calculate the remaining logic & then plotting the data into the seaborn graph.

Let’s just briefly see the central directory structure –

10. RunWindow

Here is the log directory –

11. Log Directory

And, finally, the source directory should look something like this –

12. SourceDir

Now, let’s runt the application –

Following lines are essential –

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex')

This will project the plot like this –

13. AdditionalOption

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, hue='Sex', markers=['o','v'], scatter_kws={'s':25})

This will lead to the following figures –

14. Adding Markers

As you can see, here, using the marker of (‘o’/’v’) leads to two different symbols for the different gender.

Or,

sns.lmplot(x='RiskFactor', y='SeniorCitizen', data=dfMerge, col='Sex')

This will lead to –

15. Separate By Sex

So, in this case, the application has created two completely different sets for Sex.

So, finally, we’ve done it. 😀

In the next post, I’ll be doing some more improvisation on top of these data sets. Till then – Happy Avenging! 🙂

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Building Python-based best-route apps for Indian Railways

Hi Guys!

Today, I’ll present a way to get the best route from Indian Railways train between two specific sources & destination using third-party API.

This approach is particularly beneficial if you want to integrate this logic in Azure Function or Lambda Function or any serverless functions.

Before we dig into the details. Let us explore what kind of cloud-based architecture we can implement this.

Architecture

Fig: 1 (Cloud Architecture)

In this case, I’ve considered Azure as the implementation platform.

Let’s discuss how the events will take place. At first, a user searches for the best routes between two fixed stations. The user has to provide the source & destination stations. The request will go through the Azure Firewall after validating the initial authentication. As part of the API service, it will check for similar queries & if it is there, then it will fetch it from the cache & send it back to the user through their mobile application. However, for the first time, it will retrieve the information from the DB & keep a copy in the cache. This part also managed through a load balancer for high-level availability. However, periodically system will push the data from the cache to the DB with the updated information.

Let’s see the program directory structure –

ProgramDir

Let’s discuss our code –

1. clsConfig.py (This script contains all the parameters for the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "https://trains.p.rapidapi.com/",
        'RAPID_API_HOST': "trains.p.rapidapi.com",
        'RAPID_API_KEY': "hrfjjdfjfjfjfjxxxxxjffjjfjfjfjfjfjfjf",
        'RAPID_API_TYPE': "application/json",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Indian Railway Train Schedule Search',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'COL_LIST': ['name','train_num','train_from','train_to','classes','departTime','arriveTime','Mon','Tue','Wed','Thu','Fri','Sat','Sun']
    }

As of now, I’ve replaced the API Key with the dummy value.

2. clsIndianRailway.py (This script will invoke the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsIndianRailway:
    def __init__(self):
        self.url = cf.config['URL']
        self.rapidapi_host = cf.config['RAPID_API_HOST']
        self.rapidapi_key = cf.config['RAPID_API_KEY']
        self.type = cf.config['RAPID_API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            rapidapi_host = self.rapidapi_host
            rapidapi_key = self.rapidapi_key
            type = self.type

            Ipayload = "{\"search\":\"" + rawQry + "\"}"

            jpayload = json.dumps(Ipayload)
            payload = json.loads(jpayload)

            print('Input JSON: ', str(payload))

            headers = {
                'x-rapidapi-host': rapidapi_host,
                'x-rapidapi-key': rapidapi_key,
                'content-type': type,
                'accept': type
                }

            response = requests.request("POST", url, data=payload, headers=headers)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

Let’s explain the critical snippet from the code.

url = self.url
rapidapi_host = self.rapidapi_host
rapidapi_key = self.rapidapi_key
type = self.type

Ipayload = "{\"search\":\"" + rawQry + "\"}"

jpayload = json.dumps(Ipayload)
payload = json.loads(jpayload)

The first four lines are to receive the parameter values. Our application needs to frame the search query, which is done in the IPayload variable. After that, our app will convert it into a json object type.

headers = {
    'x-rapidapi-host': rapidapi_host,
    'x-rapidapi-key': rapidapi_key,
    'content-type': type,
    'accept': type
    }

response = requests.request("POST", url, data=payload, headers=headers)

Now, the application will prepare the headers & send the request & received the response. Finally, that response will be sent by this script to the main callee application after extracting part of the response & converting that back to JSON are as follows –

response = requests.request("POST", url, data=payload, headers=headers)

ResJson  = response.text

jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)

return ResJson

3. callIndianRailwayAPI.py (This is the main script which invokes the main Indian Railway API & tries to get the response between two railway stations. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsIndianRailway as ct
import re
import numpy as np

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getArriveTimeOnly(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        str_arr_time, remain = lkp_arriveTime.split('+')

        return str_arr_time

    except Exception as e:
        x = str(e)
        str_arr_time = ''

        return str_arr_time

def getArriveDateDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        first_half, str_date_diff_init = lkp_arriveTime.split('+')

        # Replacing the text part from it & only capturing the integer part
        str_date_diff = int(re.sub(r"[a-z]","",str_date_diff_init, flags=re.I))

        return str_date_diff

    except Exception as e:
        x = str(e)
        str_date_diff = 0

        return str_date_diff

def getArriveTimeDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTimeM = str(row['arriveTimeM'])

        str_time_diff_init = int(re.sub(r'[^\w\s]', '', lkp_arriveTimeM))

        # Replacing the text part from it & only capturing the integer part
        str_time_diff = (2400 - str_time_diff_init)

        return str_time_diff

    except Exception as e:
        x = str(e)
        str_time_diff = 0

        return str_time_diff

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'
        col_list = cf.config['COL_LIST']

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

        x1 = ct.clsIndianRailway()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        # df_ret = p.read_json(ret_2, orient='records')

        df_ret = p.io.json.json_normalize(res)
        df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

        # Resetting the column orders as per JSON
        # df_ret = df_ret[list(res[0].keys())]
        column_order = col_list
        df_mod_ret = df_ret.reindex(column_order, axis=1)

        # Sorting the source data for better viewing
        df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

        l.logr('1.IndianRailway_' + var + '.csv', debug_ind, df_mod_resp, 'log')

        # Fetching Data for Delhi To Howrah
        df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

        l.logr('2.IndianRailway_Delhi2Howrah_' + var + '.csv', debug_ind, df_del_how, 'log')

        # Splitting Arrive time into two separate fields for better calculation
        df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
        df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
        df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

        l.logr('3.IndianRailway_Del2How_Mod_' + var + '.csv', debug_ind, df_del_how, 'log')

        # To fetch the best route which saves time
        lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
        min_lstTimeDayDiff = int(min(lstTimeDayDiff))

        df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

        l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

        # Now application will check the maximum arrivetimediff, this will bring the record
        # which arrives early at Howrah station
        lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
        max_lstTimeDiff = int(max(lstTimeDiff))

        df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

        # Dropping unwanted columns
        df_best_route.drop(columns=['arriveTimeM'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDayDiff'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDiff'], inplace=True)

        l.logr('5.IndianRailway_Del2How_BestRoute_' + var + '.csv', debug_ind, df_best_route, 'log')

        print("-" * 60)

        print('Realtime Indian Railway Data:: ')
        logging.info('Realtime Indian Railway Data:: ')
        print(df_mod_resp)
        print()
        print('Best Route from Delhi -> Howrah:: ')
        print(df_best_route)
        print()

        # Checking execution status
        ret_val_2 = df_best_route.shape[0]

        if ret_val_2 == 0:
            print("Indian Railway hasn't returned any rows. Please check your queries!")
            logging.info("Indian Railway hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet to explore –

# Query using parameters
rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

In this case, we make it interactive mode. However, in the actual scenario, you would receive these values from your mobile application.

x1 = ct.clsIndianRailway()
ret_2 = x1.searchQry(rawQry)

# Capturing the JSON Payload
res = json.loads(ret_2)

The above four lines initially invoke the API & receive the JSON response.

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

# Resetting the column orders as per JSON
column_order = col_list
df_mod_ret = df_ret.reindex(column_order, axis=1)

# Sorting the source data for better viewing
df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

In these last five lines, our application will convert the JSON & serialize it into pandas dataframe, which is sorted after that.

The result will look like this –

SerializeJson2PandasDF

This is exceptionally critical, as this will allow you to achieve your target. Without flattening the data, you won’t get to your goal.

# Fetching Data for Delhi To Howrah
df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

As the line suggested, our application will pick-up only those records between New Delhi & Howrah. Thus, we’ve used our filter to eliminate additional records. And, the data will look like this –

SilteredRecords

Now, we need to identify the minimum time taken by anyone of the two records. For that, we’ll be doing some calculations to fetch the minimum time taken by the application.

# Splitting Arrive time into two separate fields for better calculation
df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

To do that, we’ll be generating a couple of derived columns (shown above), which we’ll be using the fetch the shortest duration. And, the data should look like this –

CalculatedFields

These are the two fields, which we’re using for our calculation. First, we’re splitting arriveTime into two separate columns i.e. arriveTimeM & arriveTimeDayDiff. However, arriveTimeDiff is a calculated field.

So, our logic to find the best routes –

  • arriveTimeDayDiff = Take the minimum of the records. If you have multiple candidates, then we’ll pick all of them. In this case, we’ll get two records.
  • ArrivalDiff = (24:00 – <Train’s Arrival Time>), then take the maximum of the value

Note that, in this case, we haven’t considered the departure time. You can add that logic to improvise & correct your prediction.

The above steps can be seen in the following snippet –

# To fetch the best route which saves time
lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
min_lstTimeDayDiff = int(min(lstTimeDayDiff))

df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

# Now application will check the maximum arrivetimediff, this will bring the record
# which arrives early at Howrah station
lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
max_lstTimeDiff = int(max(lstTimeDiff))

df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

Let’s see how it runs –

Output

As you can see that NDLS (New Delhi), we’ve three records marked in the GREEN square box. However, as destination HWH (Howrah), we’ve only two records marked in the RED square box. However, as part of our calculation, we’ll pick the record marked with the BLUE square box.

Let’s see how the log directory generates all the files –

Log_Dir

Let’s see the final output in our csv file –

BestRoute

So, finally, we’ve achieved it. 😀

Let me know – how do you like this post. Please share your suggestion & comments.

I’ll be back with another installment from the Python verse.

Till then – Happy Avenging!

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Magic SQL

Few years before in OTN one of the user is looking for a solutions, which we think might not possible to provide in a single SQL solution. At that time Michael came to rescue that and for the first time he showed some interesting XML Kung-Fu to all of us and earned a great reputation for providing magic solution to others. I personally love to call them as Magic SQL.

The following SQL calculates number of rows in each table in a specific schema without visiting any dba views. This particular script is my 2nd personal favourite.

scott@ORCL>
scott@ORCL>select * from v$version;

BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.1.0.6.0 - Production
PL/SQL Release 11.1.0.6.0 - Production
CORE 11.1.0.6.0 Production
TNS for 32-bit Windows: Version 11.1.0.6.0 - Production
NLSRTL Version 11.1.0.6.0 - Production

Elapsed: 00:00:00.01
scott@ORCL>
scott@ORCL>
scott@ORCL>select table_name,
2 DBMS_XMLGEN.getxmltype(
3 'select count(*) c from '||table_name
4 ).extract('//text()').getnumberval() tot_rows
5 from user_tables
6 where iot_type is null
7 or iot_type != 'IOT_OVERFLOW';

TABLE_NAME TOT_ROWS
------------------------------ ----------
DEPT 4
EMP 14
BONUS 0
SALGRADE 5
EMP_DETAILS 3
T_EMP 0
AUDIT_T 0
C_C 4
TRAIN_DET 2
TEST_COM_IMP 2
TIME_P 1

TABLE_NAME TOT_ROWS
------------------------------ ----------
PRI_UQ 4
TEST_CHK 0
ANSHUMANSAHAISGENIOUS 1
XEUROPE 2
D_D 8
PUBLICTRANSPORT 4
XASIA 2
TF1 0
TEST_HH 14
TEST_SWAP 4
XGMT 1

TABLE_NAME TOT_ROWS
------------------------------ ----------
CUSTOMER_DET 1
FOURWHEELER 2
SPOOL_LOG 13
CITYTRANSPORT 8
T1 2
T2 2
A_A 1
B_B 2
AUTOMOBILE 1
XDUAL 1
S_TEMP 0

33 rows selected.

Elapsed: 00:00:00.42
scott@ORCL>

But, this particular script has some limitations on Index-Organized-Table. But, overall this will work brilliantly without even touching all the dba views.

Grouped information in comma-separated values.

scott@ORCL>with tt
2 as
3 (
4 select 1 id, 'saty' name from dual
5 union all
6 select 1, 'anup' from dual
7 union all
8 select 1, 'sas' from dual
9 union all
10 select 2, 'rajib' from dual
11 union all
12 select 2, 'amit' from dual
13 )
14 select id,
15 cast(wmsys.wm_concat(name) as varchar2(100)) src
16 from tt
17 group by id;

ID SRC
---------- ------------------------------------------------------
1 saty,anup,sas
2 rajib,amit

Elapsed: 00:00:01.62
scott@ORCL>
scott@ORCL>

Function wm_concat is undocumented. So, you cannot use it in production environment. Even if you use – you won’t get any technical support from oracle if you have any production issue due to this. So, better not to use this function. But, certainly this reduces lots of our effort and provides a solution using single SQL. I’m still waiting to see it as documented function by Oracle. Till then, you have to go for your custom tailored solution.

The following sqls parse comma-separated values into rows. But, this cannot be applicable into any column of existing tables.

scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll (24, 34, 25));

RES
----------------------------------------
24
34
25

Elapsed: 00:00:00.03
scott@ORCL>
scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll ('A', 'B', 'C'));

RES
----------------------------------------
A
B
C

Elapsed: 00:00:00.05
scott@ORCL>
scott@ORCL>SELECT cast(column_value as varchar2(40)) res
2 FROM TABLE(SYS.dbms_debug_vc2coll (24, 'B', '@'));

RES
----------------------------------------
24
B
@

Elapsed: 00:00:00.14
scott@ORCL>

This can handle alpha numeric data type without declaring any custom type for it.

Following SQL is which i prefer most as one of the brilliant features introduced in Oracle and can surprise many developer. Write a select query to retrieve any column information without using the select clause. I know – it sounds crazy. But, you really can do that.

That is what i consider is the leading contender of my Magic SQL category.

scott@ORCL>
scott@ORCL>
scott@ORCL>xquery for $i in ora:view("emp")/ROW/ENAME return $i/text()
2 /

Result Sequence
----------------------------------------------------------------------------
SMITH
ALLEN
WARD
JONES
MARTIN
BLAKE
CLARK
SCOTT
KING
TURNER
ADAMS

Result Sequence
----------------------------------------------------------------------------
JAMES
FORD
MILLER

14 item(s) selected.

Elapsed: 00:00:00.14
scott@ORCL>
scott@ORCL>

Hope you liked this edition.

Regards.