Hello guys,
Today, I’m going to discuss a new notion – living between multi-cloud environments; as you might be aware that I’ve shared a dynamic API built inside Azure in my last post. Today, I want to use that covid-19 API, originally from another third-party source & publish it as streams inside the Oracle cloud. These are the ideal case to use integration software like Tibco or Mulesoft. However, this is more useful for any start-up kind of environment or anyone who wants to build their API-based eco-system.
First, you need to register in Oracle cloud as they give a $300 trial to everyone who registers their platform for the first time.
Step 1:
You will lead the main dashboard after successfully registering in the portal by providing the essential information.

Step 2:
By default, it will show the following screen –

It would be best if you created the compartment as shown below.

Step 3:
Now, you can create the stream, as shown in the next step, by choosing the desired compartment. You need to click – “Create Stream” blue button on top of your page after selecting the desired compartment.


Also, you can test the stream by manually uploading a few sample json shown in the below step.

Step 4:
Now, we need to add API-key as follows –

This screen will prompt you to download the private key. We’ll use this in the later configuration of our python environment.
Step 5:
Now, you need to capture the content of the configuration file shown in the below figures –

You’ll get these important details under Identity marked in red-box.
Step 6:
Now, you’ll place the previously acquired private key & the content from Step-5 under the following location from where you are going to trigger the application –

You will get more details on this from these links –
Now, we’ve finished with the basic Oracle cloud setup.
Let’s check the Azure API one more time using postman –

Let us install some of the critical python packages –

Now, we’re ready with our environment.
Let us explore the codebase now.
1. clsConfig.py ( This is the configuration file for this demo-application)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################### | |
#### Written By: SATYAKI DE #### | |
#### Written On: 04-Apr-2020 #### | |
#### #### | |
#### Objective: This script is a config #### | |
#### file, contains all the keys for #### | |
#### Azure 2 OCI API. Application will #### | |
#### process these information & perform #### | |
#### the call to our newly developed Azure #### | |
#### API in OCI. #### | |
############################################### | |
import os | |
import platform as pl | |
class clsConfig(object): | |
Curr_Path = os.path.dirname(os.path.realpath(__file__)) | |
os_det = pl.system() | |
if os_det == "Windows": | |
sep = '\\' | |
else: | |
sep = '/' | |
conf = { | |
'APP_ID': 1, | |
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx", | |
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats", | |
"appType":"application/json", | |
"conType":"keep-alive", | |
"limRec":10, | |
"CACHE":"no-cache", | |
"colList": "date, state, positive, negative", | |
"typSel": "Cols", | |
"LOG_PATH":Curr_Path + sep + 'log' + sep, | |
"STREAM_NAME":"Covid19-Stream", | |
"PARTITIONS":1 | |
} |
2. clsOCIConsume.py (This will consume from the designated stream created in Oracle-cloud)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################## | |
#### Enhancement By: SATYAKI DE #### | |
#### Enhancement On: 07-Mar-2021 #### | |
#### Modified On 08-Mar-2021 #### | |
#### #### | |
#### Objective: Consuming stream from OCI #### | |
############################################## | |
import oci | |
import sys | |
import time | |
import os | |
from base64 import b64encode, b64decode | |
import json | |
from clsConfig import clsConfig as cf | |
from oci.config import from_file | |
import pandas as p | |
class clsOCIConsume: | |
def __init__(self): | |
self.comp = str(cf.conf['comp']) | |
self.STREAM_NAME = str(cf.conf['STREAM_NAME']) | |
self.PARTITIONS = int(cf.conf['PARTITIONS']) | |
self.limRec = int(cf.conf['limRec']) | |
def get_cursor_by_partition(self, client, stream_id, partition): | |
print("Creating a cursor for partition {}".format(partition)) | |
cursor_details = oci.streaming.models.CreateCursorDetails( | |
partition=partition, | |
type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON) | |
response = client.create_cursor(stream_id, cursor_details) | |
cursor = response.data.value | |
return cursor | |
def simple_message_loop(self, client, stream_id, initial_cursor): | |
try: | |
cursor = initial_cursor | |
while True: | |
get_response = client.get_messages(stream_id, cursor, limit=10) | |
# No messages to process. return. | |
if not get_response.data: | |
return | |
# Process the messages | |
print(" Read {} messages".format(len(get_response.data))) | |
for message in get_response.data: | |
print("{}: {}".format(b64decode(message.key.encode()).decode(), | |
b64decode(message.value.encode()).decode())) | |
# get_messages is a throttled method; clients should retrieve sufficiently large message | |
# batches, as to avoid too many http requests. | |
time.sleep(1) | |
# use the next-cursor for iteration | |
cursor = get_response.headers["opc-next-cursor"] | |
return 0 | |
except Exception as e: | |
x = str(e) | |
print('Error: ', x) | |
return 1 | |
def get_stream(self, admin_client, stream_id): | |
return admin_client.get_stream(stream_id) | |
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite): | |
try: | |
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name, | |
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE) | |
if list_streams.data: | |
# If we find an active stream with the correct name, we'll use it. | |
print("An active stream {} has been found".format(stream_name)) | |
sid = list_streams.data[0].id | |
return self.get_stream(sac_composite.client, sid) | |
print(" No Active stream {} has been found; Creating it now. ".format(stream_name)) | |
print(" Creating stream {} with {} partitions.".format(stream_name, partition)) | |
# Create stream_details object that need to be passed while creating stream. | |
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition, | |
compartment_id=compartment, retention_in_hours=24) | |
# Since stream creation is asynchronous; we need to wait for the stream to become active. | |
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE]) | |
return response | |
except Exception as e: | |
print(str(e)) | |
def consumeStream(self): | |
try: | |
STREAM_NAME = self.STREAM_NAME | |
PARTITIONS = self.PARTITIONS | |
compartment = self.comp | |
print('Consuming sream from Oracle Cloud!') | |
# Load the default configuration | |
config = from_file(file_location="~/.oci/config.poc") | |
# Create a StreamAdminClientCompositeOperations for composite operations. | |
stream_admin_client = oci.streaming.StreamAdminClient(config) | |
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client) | |
# We will reuse a stream if its already created. | |
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one. | |
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME, | |
PARTITIONS, stream_admin_client_composite).data | |
print(" Created Stream {} with id : {}".format(stream.name, stream.id)) | |
# Streams are assigned a specific endpoint url based on where they are provisioned. | |
# Create a stream client using the provided message endpoint. | |
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint) | |
s_id = stream.id | |
# Use a cursor for getting messages; each get_messages call will return a next-cursor for iteration. | |
# There are a couple kinds of cursors. | |
# A cursor can be created at a given partition/offset. | |
# This gives explicit offset management control to the consumer. | |
print("Starting a simple message loop with a partition cursor") | |
partition_cursor = self.get_cursor_by_partition(stream_client, s_id, partition="0") | |
self.simple_message_loop(stream_client, s_id, partition_cursor) | |
return 0 | |
except Exception as e: | |
x = str(e) | |
print(x) | |
logging.info(x) | |
return 1 |
Let’s explore the key snippet from the above code –
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite): try: list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name, lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE) if list_streams.data: # If we find an active stream with the correct name, we'll use it. print("An active stream {} has been found".format(stream_name)) sid = list_streams.data[0].id return self.get_stream(sac_composite.client, sid) print(" No Active stream {} has been found; Creating it now. ".format(stream_name)) print(" Creating stream {} with {} partitions.".format(stream_name, partition)) # Create stream_details object that need to be passed while creating stream. stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition, compartment_id=compartment, retention_in_hours=24) # Since stream creation is asynchronous; we need to wait for the stream to become active. response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE]) return response except Exception as e: print(str(e))
The above function will check if there is already any existing stream available or not. If not, then it will create one.
def get_cursor_by_partition(self, client, stream_id, partition): print("Creating a cursor for partition {}".format(partition)) cursor_details = oci.streaming.models.CreateCursorDetails( partition=partition, type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON) response = client.create_cursor(stream_id, cursor_details) cursor = response.data.value return cursor
In Oracle cloud, you need to create a cursor to consume the streaming messages. Please refer to the following link for more details.
def simple_message_loop(self, client, stream_id, initial_cursor): try: cursor = initial_cursor while True: get_response = client.get_messages(stream_id, cursor, limit=10) # No messages to process. return. if not get_response.data: return # Process the messages print(" Read {} messages".format(len(get_response.data))) for message in get_response.data: print("{}: {}".format(b64decode(message.key.encode()).decode(), b64decode(message.value.encode()).decode())) # get_messages is a throttled method; clients should retrieve sufficiently large message # batches, as to avoid too many http requests. time.sleep(1) # use the next-cursor for iteration cursor = get_response.headers["opc-next-cursor"] return 0 except Exception as e: x = str(e) print('Error: ', x) return 1
In this case, we’re looping through the channel & consuming the messages maintaining fewer HTTP requests in mind.
3. clsOCIPublish.py (This will publish msgs from the source Azure-API to designated stream created in Oracle-cloud)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################## | |
#### Enhancement By: SATYAKI DE #### | |
#### Enhancement On: 07-Mar-2021 #### | |
#### Modified On 07-Mar-2021 #### | |
#### #### | |
#### Objective: Publishing stream at OCI #### | |
############################################## | |
import oci | |
import sys | |
import time | |
import os | |
from base64 import b64encode, b64decode | |
import json | |
from clsConfig import clsConfig as cf | |
from oci.config import from_file | |
import pandas as p | |
class clsOCIPublish: | |
def __init__(self): | |
self.comp = str(cf.conf['comp']) | |
self.STREAM_NAME = str(cf.conf['STREAM_NAME']) | |
self.PARTITIONS = int(cf.conf['PARTITIONS']) | |
self.limRec = int(cf.conf['limRec']) | |
def get_stream(self, admin_client, stream_id): | |
return admin_client.get_stream(stream_id) | |
def publish_messages(self, client, stream_id, inputDF): | |
try: | |
limRec = self.limRec | |
# Converting dataframe to json | |
df = inputDF | |
# Calculating total number of records | |
cntRow = df.shape[0] | |
print('Actual Record counts: ', str(cntRow)) | |
print('Supplied Record counts: ', str(limRec)) | |
# Build up a PutMessagesDetails and publish some messages to the stream | |
message_list = [] | |
start_pos = 0 | |
end_pos = 0 | |
interval = 1 | |
for i in range(limRec): | |
split_df = p.DataFrame() | |
rJson = '' | |
# Preparing Data | |
# Getting Individual Element & convert them to Series | |
if ((start_pos + interval) <= cntRow): | |
end_pos = start_pos + interval | |
else: | |
end_pos = start_pos + (cntRow – start_pos) | |
split_df = df.iloc[start_pos:end_pos] | |
rJson = split_df.to_json(orient ='records') | |
if ((start_pos > cntRow) | (start_pos == cntRow)): | |
pass | |
else: | |
start_pos = start_pos + interval | |
key = "key" + str(i) | |
value = "value" + str(rJson) | |
# End of data preparation | |
encoded_key = b64encode(key.encode()).decode() | |
encoded_value = b64encode(value.encode()).decode() | |
message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) | |
print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) | |
messages = oci.streaming.models.PutMessagesDetails(messages=message_list) | |
put_message_result = client.put_messages(stream_id, messages) | |
# The put_message_result can contain some useful metadata for handling failures | |
for entry in put_message_result.data.entries: | |
if entry.error: | |
print("Error ({}) : {}".format(entry.error, entry.error_message)) | |
else: | |
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) | |
return 0 | |
except Exception as e: | |
x = str(e) | |
print('Error: ', x) | |
return 1 | |
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite): | |
try: | |
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name, | |
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE) | |
if list_streams.data: | |
# If we find an active stream with the correct name, we'll use it. | |
print("An active stream {} has been found".format(stream_name)) | |
sid = list_streams.data[0].id | |
return self.get_stream(sac_composite.client, sid) | |
print(" No Active stream {} has been found; Creating it now. ".format(stream_name)) | |
print(" Creating stream {} with {} partitions.".format(stream_name, partition)) | |
# Create stream_details object that need to be passed while creating stream. | |
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition, | |
compartment_id=compartment, retention_in_hours=24) | |
# Since stream creation is asynchronous; we need to wait for the stream to become active. | |
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE]) | |
return response | |
except Exception as e: | |
print(str(e)) | |
def publishStream(self, inputDf): | |
try: | |
STREAM_NAME = self.STREAM_NAME | |
PARTITIONS = self.PARTITIONS | |
compartment = self.comp | |
print('Publishing sream to Oracle Cloud!') | |
# Load the default configuration | |
config = from_file(file_location="~/.oci/config.poc") | |
# Create a StreamAdminClientCompositeOperations for composite operations. | |
stream_admin_client = oci.streaming.StreamAdminClient(config) | |
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client) | |
# We will reuse a stream if its already created. | |
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one. | |
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME, | |
PARTITIONS, stream_admin_client_composite).data | |
print(" Created Stream {} with id : {}".format(stream.name, stream.id)) | |
# Streams are assigned a specific endpoint url based on where they are provisioned. | |
# Create a stream client using the provided message endpoint. | |
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint) | |
s_id = stream.id | |
# Publish some messages to the stream | |
self.publish_messages(stream_client, s_id, inputDf) | |
return 0 | |
except Exception as e: | |
x = str(e) | |
print(x) | |
logging.info(x) | |
return 1 |
Let’s explore the key snippet from the above script –
def publish_messages(self, client, stream_id, inputDF): try: limRec = self.limRec # Converting dataframe to json df = inputDF # Calculating total number of records cntRow = df.shape[0] print('Actual Record counts: ', str(cntRow)) print('Supplied Record counts: ', str(limRec)) # Build up a PutMessagesDetails and publish some messages to the stream message_list = [] start_pos = 0 end_pos = 0 interval = 1 for i in range(limRec): split_df = p.DataFrame() rJson = '' # Preparing Data # Getting Individual Element & convert them to Series if ((start_pos + interval) <= cntRow): end_pos = start_pos + interval else: end_pos = start_pos + (cntRow - start_pos) split_df = df.iloc[start_pos:end_pos] rJson = split_df.to_json(orient ='records') if ((start_pos > cntRow) | (start_pos == cntRow)): pass else: start_pos = start_pos + interval key = "key" + str(i) value = "value" + str(rJson) # End of data preparation encoded_key = b64encode(key.encode()).decode() encoded_value = b64encode(value.encode()).decode() message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value)) print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id)) messages = oci.streaming.models.PutMessagesDetails(messages=message_list) put_message_result = client.put_messages(stream_id, messages) # The put_message_result can contain some useful metadata for handling failures for entry in put_message_result.data.entries: if entry.error: print("Error ({}) : {}".format(entry.error, entry.error_message)) else: print("Published message to partition {} , offset {}".format(entry.partition, entry.offset)) return 0 except Exception as e: x = str(e) print('Error: ', x) return 1
In the above snippet, we’re fetching data captured from our Azure-API call & then send chunk-by-chunk data to the Oracle stream for publishing.
4. clsAzureAPI.py (This will fetch msgs from the source Azure-API)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
############################################## | |
#### Written By: SATYAKI DE #### | |
#### Written On: 07-Mar-2021 #### | |
#### Modified On 07-Mar-2021 #### | |
#### #### | |
#### Objective: Calling Azure dynamic API #### | |
############################################## | |
import json | |
from clsConfig import clsConfig as cf | |
import requests | |
import logging | |
class clsAzureAPI: | |
def __init__(self): | |
self.url = cf.conf['URL'] | |
self.azure_cache = cf.conf['CACHE'] | |
self.azure_con = cf.conf['conType'] | |
self.type = cf.conf['appType'] | |
self.typSel = cf.conf['typSel'] | |
self.typVal = cf.conf['colList'] | |
def searchQry(self): | |
try: | |
url = self.url | |
api_cache = self.azure_cache | |
api_con = self.azure_con | |
type = self.type | |
typSel = self.typSel | |
typVal = self.typVal | |
querystring = {"typeSel": typSel, "typeVal": typVal} | |
print('Input JSON: ', str(querystring)) | |
headers = { | |
'content-type': type, | |
'Cache-Control': api_cache, | |
'Connection': api_con | |
} | |
response = requests.request("GET", url, headers=headers, params=querystring) | |
ResJson = response.text | |
jdata = json.dumps(ResJson) | |
ResJson = json.loads(jdata) | |
return ResJson | |
except Exception as e: | |
ResJson = '' | |
x = str(e) | |
print(x) | |
logging.info(x) | |
ResJson = {'errorDetails': x} | |
return ResJson |
I think this one is pretty straightforward as we’re invoking the Azure-API response.
5. callAzure2OracleStreaming.py (Main calling script to invoke all the class for a end to end test)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
######################################################### | |
#### Written By: SATYAKI DE #### | |
#### Written On: 06-Mar-2021 #### | |
#### Modified On 07-Mar-2021 #### | |
#### #### | |
#### Objective: Main calling scripts – #### | |
#### This Python script will consume an #### | |
#### source API data from Azure-Cloud & publish the #### | |
#### data into an Oracle Streaming platform, #### | |
#### which is compatible with Kafka. Later, another #### | |
#### consumer app will read the data from the stream.#### | |
######################################################### | |
from clsConfig import clsConfig as cf | |
import clsL as cl | |
import logging | |
import datetime | |
import clsAzureAPI as ca | |
import clsOCIPublish as co | |
import clsOCIConsume as cc | |
import pandas as p | |
import json | |
# Disbling Warning | |
def warn(*args, **kwargs): | |
pass | |
import warnings | |
warnings.warn = warn | |
# Lookup functions from | |
# Azure cloud SQL DB | |
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
def main(): | |
try: | |
# Declared Variable | |
ret_1 = 0 | |
debug_ind = 'Y' | |
res_2 = '' | |
# Defining Generic Log File | |
general_log_path = str(cf.conf['LOG_PATH']) | |
# Enabling Logging Info | |
logging.basicConfig(filename=general_log_path + 'Azure2OCIStream.log', level=logging.INFO) | |
# Initiating Log Class | |
l = cl.clsL() | |
# Moving previous day log files to archive directory | |
log_dir = cf.conf['LOG_PATH'] | |
tmpR0 = "*" * 157 | |
logging.info(tmpR0) | |
tmpR9 = 'Start Time: ' + str(var) | |
logging.info(tmpR9) | |
logging.info(tmpR0) | |
print() | |
print("Log Directory::", log_dir) | |
tmpR1 = 'Log Directory::' + log_dir | |
logging.info(tmpR1) | |
print('Welcome to Azure to Oracle Cloud Streaming(OCI) Calling Program: ') | |
print('*' * 160) | |
print('Reading dynamic Covid data from Azure API: ') | |
print('https://xxxxxx.yyyyyyyyyy.net/api/getDynamicCovidStats') | |
print() | |
print('Selected Columns for this -> date, state, positive, negative') | |
print() | |
print('This will take few seconds depending upon the volume & network!') | |
print('-' * 160) | |
print() | |
# Create the instance of the Mock Mulesoft API Class | |
x1 = ca.clsAzureAPI() | |
# Let's pass this to our map section | |
retJson = x1.searchQry() | |
# Converting JSon to Pandas Dataframe for better readability | |
# Capturing the JSON Payload | |
#res_1 = json.dumps(retJson) | |
#res = json.loads(res_1) | |
res = json.loads(retJson) | |
# Converting dictionary to Pandas Dataframe | |
# df_ret = p.read_json(ret_2, orient='records') | |
df_ret = p.io.json.json_normalize(res) | |
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[–1]) | |
# Removing any duplicate columns | |
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()] | |
print() | |
print() | |
print("-" * 160) | |
print('Publishing Azure sample result: ') | |
print(df_ret.head()) | |
# Logging Final Output | |
l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log') | |
print("-" * 160) | |
print() | |
print('*' * 160) | |
print('Calling Oracle Cloud Infrustructure Publisher Program!') | |
print('Pushing Azure API to Oracle Kafka-Streaming using OCI!') | |
print('-' * 160) | |
# Create the instance of the Mock Mulesoft API Class | |
x2 = co.clsOCIPublish() | |
retVal = x2.publishStream(df_ret) | |
if retVal == 0: | |
print('Successfully streamed to Oracle Cloud!') | |
else: | |
print('Failed to stream!') | |
print() | |
print('*' * 160) | |
print('Calling Oracle Cloud Infrustructure Consumer Program!') | |
print('Getting Oracle Streaming captured in OCI!') | |
print('-' * 160) | |
# Create the instance of the Mock Mulesoft API Class | |
x3 = cc.clsOCIConsume() | |
retVal2 = x3.consumeStream() | |
if retVal2 == 0: | |
print('Successfully streamed captured from Oracle Cloud!') | |
else: | |
print('Failed to retrieve stream from OCI!') | |
print('Finished Analysis points..') | |
print("*" * 160) | |
logging.info('Finished Analysis points..') | |
logging.info(tmpR0) | |
tmpR10 = 'End Time: ' + str(var) | |
logging.info(tmpR10) | |
logging.info(tmpR0) | |
except ValueError as e: | |
print(str(e)) | |
print("Invalid option!") | |
logging.info("Invalid option!") | |
except Exception as e: | |
print("Top level Error: args:{0}, message{1}".format(e.args, e.message)) | |
if __name__ == "__main__": | |
main() |
This one is the primary calling script, invoking all the Python classes one-by-one to run our test cases together.
Let us run our application –

And, you can see the streaming data inside Oracle cloud as shown below –

You can explore my Git associated with this project & download the code from here.
So, finally, we’ve done it. 😀
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.
You must be logged in to post a comment.