Today, We want to make our use case a little bit harder & more realistic. We want to consume real-time live trade-data consuming through FinnHub API & displaying them into our dashboard using another brilliant H2O-Wave API with the help of native Python.
The use-case mentioned above is extremely useful & for that, we’ll be using the following Third-Party APIs to achieve the same –
- FinnHub: For more information, please click the following link.
- Ably: For more information, please click the following link.
- H2O-Wave: For more information, please click the following link.
I’m not going to discuss these topics more, as I’ve already discussed them in separate earlier posts. Please refer to the following threads for detailed level information –
creating-a-real-time-dashboard-from-streaming-data-using-python
In this post, we will address the advanced concept compared to the previous post mentioned above. Let us first look at how the run looks before we start exploring the details –

Let us explore the architecture of this implementation –

This application will talk to the FinnHub websocket & consume real-time trade data from it. And this will be temporarily stored in our Ably channels. The dashboard will pick the message & display that as soon as there is new data for that trading company.
For this use case, you need to install the following packages –
STEP – 1:

STEP – 2:

STEP – 3:

STEP – 4:

You can copy the following commands to install the above-mentioned packages –
pip install ably
pip install h2o-wave
pip install pandas
pip install websocket
pip install websocket-client
Let’s explore the important data-point that you need to capture from the FinnHub portal to consume the real-time trade data –

We’ve two main scripts. The first script will consume the streaming data into a message queue & the other one will be extracting the data from the queue & transform the data & publish it into the real-time dashboard.
1. dashboard_finnhub.py ( This native Python script will consume streaming data & create the live trade dashboard. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################### | |
#### Template Written By: H2O Wave #### | |
#### Enhanced with Streaming Data By: Satyaki De #### | |
#### Base Version Enhancement On: 20-Dec-2020 #### | |
#### Modified On 27-Jun-2021 #### | |
#### #### | |
#### Objective: This script will consume real-time #### | |
#### streaming data coming out from a hosted API #### | |
#### sources (Finnhub) using another popular third-party #### | |
#### service named Ably. Ably mimics pubsub Streaming #### | |
#### concept, which might be extremely useful for #### | |
#### any start-ups. #### | |
#### #### | |
#### Note: This is an enhancement of my previous post of #### | |
#### H2O Wave. In this case, the application will consume #### | |
#### streaming trade data from a live host & not generated #### | |
#### out of the mock data. Thus, it is more useful for the #### | |
#### start-ups. #### | |
############################################################### | |
import time | |
from h2o_wave import site, data, ui | |
from ably import AblyRest | |
import pandas as p | |
import json | |
import datetime | |
import logging | |
import platform as pl | |
from clsConfig import clsConfig as cf | |
import clsL as cl | |
# Disbling Warning | |
def warn(*args, **kwargs): | |
pass | |
import warnings | |
warnings.warn = warn | |
# Lookup functions from | |
# Azure cloud SQL DB | |
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
# Global Area | |
## Global Class | |
# Initiating Log Class | |
l = cl.clsL() | |
# Global Variables | |
# Moving previous day log files to archive directory | |
log_dir = cf.config['LOG_PATH'] | |
path = cf.config['INIT_PATH'] | |
subdir = cf.config['SUBDIR'] | |
## End Of Global Part | |
class DaSeries: | |
def __init__(self, inputDf): | |
self.Df = inputDf | |
self.count_row = inputDf.shape[0] | |
self.start_pos = 0 | |
self.end_pos = 0 | |
self.interval = 1 | |
def next(self): | |
try: | |
# Getting Individual Element & convert them to Series | |
if ((self.start_pos + self.interval) <= self.count_row): | |
self.end_pos = self.start_pos + self.interval | |
else: | |
self.end_pos = self.start_pos + (self.count_row – self.start_pos) | |
split_df = self.Df.iloc[self.start_pos:self.end_pos] | |
if ((self.start_pos > self.count_row) | (self.start_pos == self.count_row)): | |
pass | |
else: | |
self.start_pos = self.start_pos + self.interval | |
x = float(split_df.iloc[0]['CurrentExchange']) | |
dx = float(split_df.iloc[0]['Change']) | |
# Emptying the exisitng dataframe | |
split_df = p.DataFrame(None) | |
return x, dx | |
except: | |
x = 0 | |
dx = 0 | |
return x, dx | |
class CategoricalSeries: | |
def __init__(self, sourceDf): | |
self.series = DaSeries(sourceDf) | |
self.i = 0 | |
def next(self): | |
x, dx = self.series.next() | |
self.i += 1 | |
return f'C{self.i}', x, dx | |
light_theme_colors = '$red $pink $purple $violet $indigo $blue $azure $cyan $teal $mint $green $amber $orange $tangerine'.split() | |
dark_theme_colors = '$red $pink $blue $azure $cyan $teal $mint $green $lime $yellow $amber $orange $tangerine'.split() | |
_color_index = –1 | |
colors = dark_theme_colors | |
def next_color(): | |
global _color_index | |
_color_index += 1 | |
return colors[_color_index % len(colors)] | |
_curve_index = –1 | |
curves = 'linear smooth step step-after step-before'.split() | |
def next_curve(): | |
global _curve_index | |
_curve_index += 1 | |
return curves[_curve_index % len(curves)] | |
def calc_p(row): | |
try: | |
str_calc_s1 = str(row['s_x']) | |
str_calc_s2 = str(row['s_y']) | |
if str_calc_s1 == str_calc_s2: | |
calc_p_val = float(row['p_y']) | |
else: | |
calc_p_val = float(row['p_x']) | |
return calc_p_val | |
except: | |
return 0.0 | |
def calc_v(row): | |
try: | |
str_calc_s1 = str(row['s_x']) | |
str_calc_s2 = str(row['s_y']) | |
if str_calc_s1 == str_calc_s2: | |
calc_v_val = float(row['v_y']) | |
else: | |
calc_v_val = float(row['v_x']) | |
return calc_v_val | |
except: | |
return 0.0 | |
def process_DF(inputDF, inputDFUnq): | |
try: | |
# Core Business logic | |
# The application will show default value to any | |
# trade-in stock in case that data doesn't consume | |
# from the source. | |
df_conv = inputDF | |
df_unique_fin = inputDFUnq | |
df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count') | |
l.logr('3. max_df.csv', 'Y', df_conv, subdir) | |
# Sorting the output | |
sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True) | |
# New Column List Orders | |
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v'] | |
df_fin = sorted_df.reindex(column_order, axis=1) | |
l.logr('4. sorted_df.csv', 'Y', df_fin, subdir) | |
# Now splitting the sorted df into two sets | |
lkp_max_count = 4 | |
df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)] | |
l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir) | |
df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)] | |
l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir) | |
# Now to perform cross join, we will create | |
# a key column in both the DataFrames to | |
# merge on that key. | |
df_unique_fin['key'] = 1 | |
df_fin_req['key'] = 1 | |
# Dropping unwanted columns | |
df_unique_fin.drop(columns=['t'], axis=1, inplace=True) | |
l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir) | |
# Padding with dummy key values | |
#merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left') | |
merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1) | |
l.logr('8. merge_df.csv', 'Y', merge_df, subdir) | |
# Sorting the output | |
sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True) | |
l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir) | |
# Calling new derived logic | |
sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1) | |
sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1) | |
l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir) | |
# Dropping unwanted columns | |
sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True) | |
#Renaming the columns | |
sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True) | |
sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True) | |
sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True) | |
sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True) | |
l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir) | |
# Aligning columns | |
column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v'] | |
merge_fin_df = sorted_merge_df.reindex(column_order, axis=1) | |
l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir) | |
# Finally, appending these two DataFrame (df_fin_na & merge_fin_df) | |
frames = [df_fin_na, merge_fin_df] | |
fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"]) | |
l.logr('13. fin_df.csv', 'Y', fin_df, subdir) | |
# Final clearance & organization | |
fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True) | |
l.logr('14. Final.csv', 'Y', fin_df, subdir) | |
# Adjusting key columns | |
fin_df.rename(columns={'s':'Company'}, inplace=True) | |
fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True) | |
fin_df.rename(columns={'v':'Change'}, inplace=True) | |
l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir) | |
return fin_df | |
except Exception as e: | |
print('$' * 120) | |
x = str(e) | |
print(x) | |
print('$' * 120) | |
df = p.DataFrame() | |
return df | |
def create_dashboard(update_freq=0.0): | |
page = site['/dashboard_finnhub'] | |
general_log_path = str(cf.config['LOG_PATH']) | |
ably_id = str(cf.config['ABLY_ID']) | |
# Enabling Logging Info | |
logging.basicConfig(filename=general_log_path + 'Realtime_Stock.log', level=logging.INFO) | |
os_det = pl.system() | |
if os_det == "Windows": | |
src_path = path + '\\' + 'data\\' | |
else: | |
src_path = path + '/' + 'data/' | |
# Fetching the data | |
client = AblyRest(ably_id) | |
channel = client.channels.get('sd_channel') | |
message_page = channel.history() | |
# Counter Value | |
cnt = 0 | |
# Declaring Global Data-Frame | |
df_conv = p.DataFrame() | |
for i in message_page.items: | |
print('Last Msg: {}'.format(i.data)) | |
json_data = json.loads(i.data) | |
# Converting JSON to Dataframe | |
df = p.json_normalize(json_data) | |
df.columns = df.columns.map(lambda x: x.split(".")[–1]) | |
if cnt == 0: | |
df_conv = df | |
else: | |
d_frames = [df_conv, df] | |
df_conv = p.concat(d_frames) | |
cnt += 1 | |
# Resetting the Index Value | |
df_conv.reset_index(drop=True, inplace=True) | |
print('DF:') | |
print(df_conv) | |
# Writing to the file | |
l.logr('1. DF_modified.csv', 'Y', df_conv, subdir) | |
# Dropping unwanted columns | |
df_conv.drop(columns=['c'], axis=1, inplace=True) | |
df_conv['default_rank'] = df_conv.groupby(['s']).cumcount() + 1 | |
lkp_rank = 1 | |
df_unique = df_conv[(df_conv['default_rank'] == lkp_rank)] | |
# New Column List Orders | |
column_order = ['s', 'default_rank', 'p', 't', 'v'] | |
df_unique_fin = df_unique.reindex(column_order, axis=1) | |
print('Rank DF Unique:') | |
print(df_unique_fin) | |
l.logr('2. df_unique.csv', 'Y', df_unique_fin, subdir) | |
# Capturing transformed values into a DataFrame | |
# Depending on your logic, you'll implement that inside | |
# the process_DF functions | |
fin_df = process_DF(df_conv, df_unique_fin) | |
df_unq_fin = df_unique_fin.copy() | |
df_unq_fin.rename(columns={'s':'Company'}, inplace=True) | |
df_unq_fin.rename(columns={'p':'CurrentExchange'}, inplace=True) | |
df_unq_fin.rename(columns={'v':'Change'}, inplace=True) | |
df_unq_fin.drop(columns=['default_rank','key'], axis=1, inplace=True) | |
l.logr('16. df_unq_fin.csv', 'Y', df_unq_fin, subdir) | |
df_unq_finale = df_unq_fin.sort_values(by=['Company'], ascending=True) | |
l.logr('17. df_unq_finale.csv', 'Y', df_unq_finale, subdir) | |
# Final clearance for better understanding of data | |
fin_df.drop(columns=['t'], axis=1, inplace=True) | |
l.logr('18. CleanFinal.csv', 'Y', fin_df, subdir) | |
count_row = df_unq_finale.shape[0] | |
large_lines = [] | |
start_pos = 0 | |
end_pos = 0 | |
interval = 1 | |
# Converting dataframe to a desired Series | |
f = CategoricalSeries(fin_df) | |
for j in range(count_row): | |
# Getting the series values from above | |
cat, val, pc = f.next() | |
# Getting Individual Element & convert them to Series | |
if ((start_pos + interval) <= count_row): | |
end_pos = start_pos + interval | |
else: | |
end_pos = start_pos + (count_row – start_pos) | |
split_df = df_unq_finale.iloc[start_pos:end_pos] | |
if ((start_pos > count_row) | (start_pos == count_row)): | |
pass | |
else: | |
start_pos = start_pos + interval | |
x_currency = str(split_df.iloc[0]['Company']) | |
#################################################### | |
##### Debug Purpose ######### | |
#################################################### | |
print('Company: ', x_currency) | |
print('J: ', str(j)) | |
print('Cat: ', cat) | |
#################################################### | |
##### End Of Debug ####### | |
#################################################### | |
c = page.add(f'e{j+1}', ui.tall_series_stat_card( | |
box=f'{j+1} 1 1 2', | |
title=x_currency, | |
value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}', | |
aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}', | |
data=dict(qux=val, quux=pc), | |
plot_type='area', | |
plot_category='foo', | |
plot_value='qux', | |
plot_color=next_color(), | |
plot_data=data('foo qux', –15), | |
plot_zero_value=0, | |
plot_curve=next_curve(), | |
)) | |
large_lines.append((f, c)) | |
page.save() | |
while update_freq > 0: | |
time.sleep(update_freq) | |
for f, c in large_lines: | |
cat, val, pc = f.next() | |
print('Update Cat: ', cat) | |
print('Update Val: ', val) | |
print('Update pc: ', pc) | |
print('*' * 160) | |
c.data.qux = val | |
c.data.quux = pc / 100 | |
c.plot_data[–1] = [cat, val] | |
page.save() | |
if __name__ == "__main__": | |
try: | |
# Main Calling script | |
create_dashboard(update_freq=0.25) | |
except Exception as e: | |
x = str(e) | |
print(x) |
Let’s explore the key snippets from the above script –
def process_DF(inputDF, inputDFUnq): try: # Core Business logic # The application will show default value to any # trade-in stock in case that data doesn't consume # from the source. # Getting block count #df_conv['block_count'] = df_conv.groupby(['default_rank']).cumcount() #l.logr('3. block_df.csv', 'Y', df_conv, subdir) # Getting block count #df_conv['max_count'] = df_conv.groupby(['default_rank']).size() #df_conv_fin = df_conv.groupby(['default_rank']).agg(['count']) #df_conv_fin = df_conv.value_counts(['default_rank']).reset_index(name='max_count') #df_conv_fin = df_conv.value_counts(['default_rank']) df_conv = inputDF df_unique_fin = inputDFUnq df_conv['max_count'] = df_conv.groupby('default_rank')['default_rank'].transform('count') l.logr('3. max_df.csv', 'Y', df_conv, subdir) # Sorting the output sorted_df = df_conv.sort_values(by=['default_rank','s'], ascending=True) # New Column List Orders column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v'] df_fin = sorted_df.reindex(column_order, axis=1) l.logr('4. sorted_df.csv', 'Y', df_fin, subdir) # Now splitting the sorted df into two sets lkp_max_count = 4 df_fin_na = df_fin[(df_fin['max_count'] == lkp_max_count)] l.logr('5. df_fin_na.csv', 'Y', df_fin_na, subdir) df_fin_req = df_fin[(df_fin['max_count'] != lkp_max_count)] l.logr('6. df_fin_req.csv', 'Y', df_fin_req, subdir) # Now to perform cross join, we will create # a key column in both the DataFrames to # merge on that key. df_unique_fin['key'] = 1 df_fin_req['key'] = 1 # Dropping unwanted columns df_unique_fin.drop(columns=['t'], axis=1, inplace=True) l.logr('7. df_unique_slim.csv', 'Y', df_unique_fin, subdir) # Padding with dummy key values #merge_df = p.merge(df_unique_fin,df_fin_req,on=['s'],how='left') merge_df = p.merge(df_unique_fin,df_fin_req,on=['key']).drop("key", 1) l.logr('8. merge_df.csv', 'Y', merge_df, subdir) # Sorting the output sorted_merge_df = merge_df.sort_values(by=['default_rank_y','s_x'], ascending=True) l.logr('9. sorted_merge_df.csv', 'Y', sorted_merge_df, subdir) # Calling new derived logic sorted_merge_df['derived_p'] = sorted_merge_df.apply(lambda row: calc_p(row), axis=1) sorted_merge_df['derived_v'] = sorted_merge_df.apply(lambda row: calc_v(row), axis=1) l.logr('10. sorted_merge_derived.csv', 'Y', sorted_merge_df, subdir) # Dropping unwanted columns sorted_merge_df.drop(columns=['default_rank_x', 'p_x', 'v_x', 's_y', 'p_y', 'v_y'], axis=1, inplace=True) #Renaming the columns sorted_merge_df.rename(columns={'s_x':'s'}, inplace=True) sorted_merge_df.rename(columns={'default_rank_y':'default_rank'}, inplace=True) sorted_merge_df.rename(columns={'derived_p':'p'}, inplace=True) sorted_merge_df.rename(columns={'derived_v':'v'}, inplace=True) l.logr('11. org_merge_derived.csv', 'Y', sorted_merge_df, subdir) # Aligning columns column_order = ['s', 'default_rank', 'max_count', 'p', 't', 'v'] merge_fin_df = sorted_merge_df.reindex(column_order, axis=1) l.logr('12. merge_fin_df.csv', 'Y', merge_fin_df, subdir) # Finally, appending these two DataFrame (df_fin_na & merge_fin_df) frames = [df_fin_na, merge_fin_df] fin_df = p.concat(frames, keys=["s", "default_rank", "max_count"]) l.logr('13. fin_df.csv', 'Y', fin_df, subdir) # Final clearance & organization fin_df.drop(columns=['default_rank', 'max_count'], axis=1, inplace=True) l.logr('14. Final.csv', 'Y', fin_df, subdir) # Adjusting key columns fin_df.rename(columns={'s':'Company'}, inplace=True) fin_df.rename(columns={'p':'CurrentExchange'}, inplace=True) fin_df.rename(columns={'v':'Change'}, inplace=True) l.logr('15. TransormedFinal.csv', 'Y', fin_df, subdir) return fin_df except Exception as e: print('$' * 120) x = str(e) print(x) print('$' * 120) df = p.DataFrame() return df
The above function will check if the queue is sending all the key trade-in data for all the companies. In our use case, we’re testing with the four companies & they are as follows –
a. AAPL
b. AMZN
c. BINANCE:BTCUSDT
d. IC MARKETS:1
Every message is containing data from all of these four companies together. If any of the company’s data is missing, this transformation will add a dummy record of that missing company to make the uniform number of entries in each message bouquet. And dummy trade-in values added for all the missing information.
def calc_p(row): try: str_calc_s1 = str(row['s_x']) str_calc_s2 = str(row['s_y']) if str_calc_s1 == str_calc_s2: calc_p_val = float(row['p_y']) else: calc_p_val = float(row['p_x']) return calc_p_val except: return 0.0 def calc_v(row): try: str_calc_s1 = str(row['s_x']) str_calc_s2 = str(row['s_y']) if str_calc_s1 == str_calc_s2: calc_v_val = float(row['v_y']) else: calc_v_val = float(row['v_x']) return calc_v_val except: return 0.0
The above snippet will capture the default values for those missing records.
client = AblyRest(ably_id) channel = client.channels.get('sd_channel') message_page = channel.history()
In the above snippet, the application will consume the streaming data from the Ably queue.
for i in message_page.items: print('Last Msg: {}'.format(i.data)) json_data = json.loads(i.data) # Converting JSON to Dataframe df = p.json_normalize(json_data) df.columns = df.columns.map(lambda x: x.split(".")[-1]) if cnt == 0: df_conv = df else: d_frames = [df_conv, df] df_conv = p.concat(d_frames) cnt += 1
The above snippet will convert the streaming messages to a more meaningful pandas data-frame, which we can use for a wide variety of analytics.
# Converting dataframe to a desired Series f = CategoricalSeries(fin_df) for j in range(count_row): # Getting the series values from above cat, val, pc = f.next() # Getting Individual Element & convert them to Series if ((start_pos + interval) <= count_row): end_pos = start_pos + interval else: end_pos = start_pos + (count_row - start_pos) split_df = df_unq_finale.iloc[start_pos:end_pos] if ((start_pos > count_row) | (start_pos == count_row)): pass else: start_pos = start_pos + interval x_currency = str(split_df.iloc[0]['Company']) #################################################### ##### Debug Purpose ######### #################################################### print('Company: ', x_currency) print('J: ', str(j)) print('Cat: ', cat) #################################################### ##### End Of Debug ####### #################################################### c = page.add(f'e{j+1}', ui.tall_series_stat_card( box=f'{j+1} 1 1 2', title=x_currency, value='=${{intl qux minimum_fraction_digits=2 maximum_fraction_digits=2}}', aux_value='={{intl quux style="percent" minimum_fraction_digits=1 maximum_fraction_digits=1}}', data=dict(qux=val, quux=pc), plot_type='area', plot_category='foo', plot_value='qux', plot_color=next_color(), plot_data=data('foo qux', -15), plot_zero_value=0, plot_curve=next_curve(), )) large_lines.append((f, c)) page.save() while update_freq > 0: time.sleep(update_freq) for f, c in large_lines: cat, val, pc = f.next() print('Update Cat: ', cat) print('Update Val: ', val) print('Update pc: ', pc) print('*' * 160) c.data.qux = val c.data.quux = pc / 100 c.plot_data[-1] = [cat, val] page.save()
The above snippet will consume the data into H2O-Wave driven framework, which will expose this data into beautiful & easily representable GUI-based solutions through an interactive dashboard.
2. publish_ably_mod.py ( This native Python script will consume streaming data into Ably message Queue )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################################### | |
#### #### | |
#### Written By: Satyaki De #### | |
#### Written Date: 26-Jun-2021 #### | |
#### #### | |
#### Objective: This script will consume real-time #### | |
#### streaming data coming out from a hosted API #### | |
#### sources (Finnhub) using another popular third-party #### | |
#### service named Ably. Ably mimics pubsub Streaming #### | |
#### concept, which might be extremely useful for #### | |
#### any start-ups. #### | |
#### #### | |
############################################################### | |
from ably import AblyRest | |
import logging | |
import json | |
# generate random floating point values | |
from random import seed | |
from random import random | |
# seed random number generator | |
import websocket | |
import json | |
from clsConfig import clsConfig as cf | |
seed(1) | |
# Global Section | |
logger = logging.getLogger('ably') | |
logger.addHandler(logging.StreamHandler()) | |
ably_id = str(cf.config['ABLY_ID']) | |
ably = AblyRest(ably_id) | |
channel = ably.channels.get('sd_channel') | |
# End Of Global Section | |
def on_message(ws, message): | |
print("*" * 60) | |
res = json.loads(message) | |
jsBody = res["data"] | |
jdata_dyn = json.dumps(jsBody) | |
print(jdata_dyn) | |
# JSON data | |
# This is the default data for all the identified category | |
# we've prepared. You can extract this dynamically. Or, By | |
# default you can set their base trade details. | |
json_data = [{ | |
"c": "null", | |
"p": 0.01, | |
"s": "AAPL", | |
"t": 1624715406407, | |
"v": 0.01 | |
},{ | |
"c": "null", | |
"p": 0.01, | |
"s": "AMZN", | |
"t": 1624715406408, | |
"v": 0.01 | |
},{ | |
"c": "null", | |
"p": 0.01, | |
"s": "BINANCE:BTCUSDT", | |
"t": 1624715406409, | |
"v": 0.01 | |
}, | |
{ | |
"c": "null", | |
"p": 0.01, | |
"s": "IC MARKETS:1", | |
"t": 1624715406410, | |
"v": 0.01 | |
}] | |
jdata = json.dumps(json_data) | |
# Publish a message to the sd_channel channel | |
channel.publish('event', jdata) | |
# Publish rest of the messages to the sd_channel channel | |
channel.publish('event', jdata_dyn) | |
jsBody = [] | |
jdata_dyn = '' | |
def on_error(ws, error): | |
print(error) | |
def on_close(ws): | |
print("### closed ###") | |
def on_open(ws): | |
# Invoking Individual Company Trade Queries | |
ws.send('{"type":"subscribe","symbol":"AAPL"}') | |
ws.send('{"type":"subscribe","symbol":"AMZN"}') | |
ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}') | |
ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}') | |
if __name__ == "__main__": | |
websocket.enableTrace(True) | |
ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=jfhfyr8474rpv6av0", | |
on_message = on_message, | |
on_error = on_error, | |
on_close = on_close) | |
ws.on_open = on_open | |
ws.run_forever() |
The key snippet from the above script –
json_data = [{ "c": "null", "p": 0.01, "s": "AAPL", "t": 1624715406407, "v": 0.01 },{ "c": "null", "p": 0.01, "s": "AMZN", "t": 1624715406408, "v": 0.01 },{ "c": "null", "p": 0.01, "s": "BINANCE:BTCUSDT", "t": 1624715406409, "v": 0.01 }, { "c": "null", "p": 0.01, "s": "IC MARKETS:1", "t": 1624715406410, "v": 0.01 }]
As we already discussed, we’ll pass a default set of data for all the candidate companies.
# Publish a message to the sd_channel channel channel.publish('event', jdata) # Publish rest of the messages to the sd_channel channel channel.publish('event', jdata_dyn)
Publish the messages to the created channel.
def on_open(ws): # Invoking Individual Company Trade Queries ws.send('{"type":"subscribe","symbol":"AAPL"}') ws.send('{"type":"subscribe","symbol":"AMZN"}') ws.send('{"type":"subscribe","symbol":"BINANCE:BTCUSDT"}') ws.send('{"type":"subscribe","symbol":"IC MARKETS:1"}') if __name__ == "__main__": websocket.enableTrace(True) ws = websocket.WebSocketApp("wss://ws.finnhub.io?token=hdhdjdj9494ld934v6av0", on_message = on_message, on_error = on_error, on_close = on_close)
Send the company-specific trade queries through websocket apps to submit that to FinnHub.
3. clsConfig.py ( This file contains the configuration details. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################################################ | |
#### Written By: SATYAKI DE #### | |
#### Written On: 15-May-2020 #### | |
#### #### | |
#### Objective: This script is a config #### | |
#### file, contains all the keys for #### | |
#### Machine-Learning. Application will #### | |
#### process these information & perform #### | |
#### various analysis on Linear-Regression. #### | |
################################################ | |
import os | |
import platform as pl | |
class clsConfig(object): | |
Curr_Path = os.path.dirname(os.path.realpath(__file__)) | |
os_det = pl.system() | |
if os_det == "Windows": | |
sep = '\\' | |
else: | |
sep = '/' | |
config = { | |
'APP_ID': 1, | |
'ARCH_DIR': Curr_Path + sep + 'arch' + sep, | |
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep, | |
'LOG_PATH': Curr_Path + sep + 'log' + sep, | |
'REPORT_PATH': Curr_Path + sep + 'report', | |
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'TradeIn.csv', | |
'SRC_PATH': Curr_Path + sep + 'Data' + sep, | |
'APP_DESC_1': 'H2O Wave Integration with FinHubb!', | |
'DEBUG_IND': 'N', | |
'INIT_PATH': Curr_Path, | |
'SUBDIR' : 'data', | |
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e' | |
} |
Let’s explore the directory structure –

Let’s run the application –
Step 1:

Step 2:

Step 3:

You can monitor the message consumption from your Ably portal as follows –

If you want to know more detail, then you need to scroll down the page, where you will get this additional information –

And, the final output in the interactive dashboard will be look like the below screenshot –

So, we’ve done it.
You will get the complete codebase in the following Github link.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.
You must be logged in to post a comment.