Azure-API calls from python-based OCI function through the oracle API-Gateway.

Today, I’ll be discussing Oracle Cloud Function interaction with Azure-API through Oracle API Gateway using native python. Again, I want to touch on this subject as I didn’t find lots of relevant material using python over the net.

Let’s explore our use case. For this use case, I’ll use an old Azure-API that I’ve developed in early 2019 & shared here during that time.

Now, we need to prepare our environment in Oracle-cloud.

Step 1:

We need to configure the virtual network as shown in the below collage picture, which will depict the step-by-step process to create it. For security reasons, I’ve masked sensitive information. It would help if you captured them from your cloud portal.

VCN creation process

Make sure you choose the correct options & validate at the end, as shown in the below picture.

VCN Creation – Final Step

If all the information provided is correct, then you should see the following screen.

VCN Creation – Final Screen

Step 2:

Now, we need to create an application. As per OCI guidelines, one cannot generate any function or group of functions without the container, known as application.

Creation of Application

From the above collage pic, you can see how we create the application by providing all the necessary inputs.

Step 3:

Now, you need to create the registry as shown below –

Creation of Registry

Your function-container will stay inside it after deployment. To know more about this, click the following link.

Step 4:

If you haven’t generated the auth-token already, then this is the time to render it as shown below –

Generation of Auth-Token

Step 5:

This next piece of information is highly crucial & on many occasions, you need this piece of information.

Object storage namespace

Just keep this information handy. I’ll refer to this step whenever we need it. You can get the details here.

Step 6:

Let’s create the gateway now. Please refer to the following collage pics, showing the step-by-step process.

Creation of Gateway

Make sure you have validated it before you proceed to the next step.

Step 7:

Let’s create the function under the application. I find this GUI option is relatively easier than configuring locally & then push it to the OCI. Let’s follow the process shown in the collage of pics mentioned here –

Creation of Function

So, you need to click executing series of commands as shown above. And, the good thing is the majority of the critical pieces of commands are automatically generated for you. So, you don’t need to spend lots of time finding out this information.

Here, we’ll be executing a series of commands as shown below –

Creation of function – continue

Few necessary commands that I want to discuss here –

fn init --runtime python <function-name>

This command will create a template of scripts based on your supplied language. You need to modify the main script (func.py) later, with your appropriate logic. You can add other scripts as class & refer to that class inside your func.py as well.

For a better deployment & control environment, it is always wise to create a virtual env.

Just like the Azure function, you need to update your requirements.txt file before your deployment command.

pip freeze>requirements.txt

Once we are satisfied with our development; we’ll deploy the application as shown below –

Deployment of function

Again, few relevant command that I want to discuss it here –

fn -v deploy --app <Application-Name>

This command will deploy all the oracle functions if they have any changes & push them to the OCI. During this time, it will check all the dependant packages that you are using & tried to install them one-by-one.

If you have already deployed & you want to upgrade your logic, then the deployment option will show something like this –

Deployment of function – continue

All the commands are pretty standard & marked with a red-square box. Few necessary commands to discuss –

fn invoke <Application-Name> <Function-Name>

And if you are not using any external API. Ideally, the above command should return the output with the default value. But, for our case, we have used Azure-API, which is outside the OCI. Hence, we need to update few more settings before it works.

Unlike, Azure-Function, you won’t get the link by default when running them locally using Visual Studio Code editor.

Here, you need to execute the following commands as shown in the above picture –

fn inspect function <Application-Name> <Function-Name>

If your deployment is successful, you will see your function docker-image inside your registry as shown below –

Deployment image of functions

To know more about fn-commands, click the following link.

Step 8:

Now, you need to update some policies, which will help API-Gateway to work.

Update of policy & logging feature

Also, you need to configure your default log for your function, as shown above.

Apart from that, we need to whitelist the port 443 as shown below –

Port whitelisting in VCN

Finally, we need to deploy our existing function into Oracle-Gateway. It would help if you prepared a deployable json object, which will create a channel for the function to interact through the API-gateway deployment.

Deployment of function inside API-Gateway

The deployment json file should looks something like this –

spec.json


{
"routes": [
{
"path": "/getdata",
"methods": [
"GET","POST"
],
"backend": {
"type": "ORACLE_FUNCTIONS_BACKEND",
"functionId": "ocid1.fnfunc.oc1.us-sanjose-1.aaaaxxxxxxxjdjfjfjfjfjfjfjfjfjfjfjfjfjfjfjdsssssss2dfjdfjdjd33376dq"
}
}
]
}

view raw

spec.json

hosted with ❤ by GitHub

You will get more on this from this link.

Make sure that your path prefix should be unique, as shown in the above picture. And, if you want to know the complete steps to prepare your oracle function, you need to go through this master link.

Now, we’re ready to test the application. But, before that, we want to explore the code-base.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsAzureAPI.py ( This is the modified version of old AzureAPI class. We’ve added a new logger, which works inside OCI. No other changes in the man logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
strMsg = 'Input JSON: ' + str(querystring)
logging.getLogger().info(strMsg)
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

3. func.py ( Main calling script. This one auto-genarated by OCI, while creating the functions. We’ve modified it as per our logic. )


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Mar-2021 ####
#### Modified On 20-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import io
import json
import logging
from fdk import response
import clsAzureAPI as ca
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def handler(ctx, data: io.BytesIO = None):
try:
email = "default@gmail.com"
# Checking individual elements
try:
body = json.loads(data.getvalue())
email = body.get("email")
except (Exception, ValueError) as ex:
logging.getLogger().info('error parsing json payload: ' + str(ex))
logging.getLogger().info("Calling Oracle Python getCovidData function!")
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
resJson = json.loads(retJson)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Success", "message": resJson}),
headers={"Content-Type": "application/json"}
)
except Exception as e:
x = str(e)
return response.Response(
ctx, response_data=json.dumps(
{"status":"Failed", "message": x}),
headers={"Content-Type": "application/json"}
)

view raw

func.py

hosted with ❤ by GitHub

Key snippet that we want to discuss here –

        # Checking individual elements
        try:
            body = json.loads(data.getvalue())
            email = body.get("email")
        except (Exception, ValueError) as ex:
            logging.getLogger().info('error parsing json payload: ' + str(ex))

Checking the individual element in the input payload.

        # Create the instance of the Mock Mulesoft API Class
        x1 = ca.clsAzureAPI()

        # Let's pass this to our map section
        retJson = x1.searchQry()

        # Converting JSon to Pandas Dataframe for better readability
        # Capturing the JSON Payload
        resJson = json.loads(retJson)

Now, we’re calling the azure-API class & receiving the response into a JSON variable.

return response.Response(
            ctx, response_data=json.dumps(
                {"status":"Success", "message": resJson}),
            headers={"Content-Type": "application/json"}
        )

Sending final response to the client.

4. func.yaml ( Main configuration script. This one auto-genarated by OCI, while creating the functions. )


schema_version: 20180708
name: getcoviddata
version: 0.0.1
runtime: python
entrypoint: /python/bin/fdk /function/func.py handler
memory: 256

view raw

func.yaml

hosted with ❤ by GitHub


Let’s run it from postman –

Invoking OCI-Function from Postman

During this demo, I’ve realized that the Oracle function yet to get maturity compared to AWS Lambda or Azure function using python. I almost faced similar challenges, which I faced nearly two years back when I tried to implement Azure function using python. However, I’m optimistic that the Oracle Cloud function will mature itself & share an integrated GUI environment to deploy python-based components straight from the IDE, rather than implementing through a CLI-driven approach. Correct me in case if I missed the IDE, which supports this feature.


You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀

I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.

Streaming Data from Azure to Oracle Clouds

Hello guys,

Today, I’m going to discuss a new notion – living between multi-cloud environments; as you might be aware that I’ve shared a dynamic API built inside Azure in my last post. Today, I want to use that covid-19 API, originally from another third-party source & publish it as streams inside the Oracle cloud. These are the ideal case to use integration software like Tibco or Mulesoft. However, this is more useful for any start-up kind of environment or anyone who wants to build their API-based eco-system.

First, you need to register in Oracle cloud as they give a $300 trial to everyone who registers their platform for the first time.

Step 1:

You will lead the main dashboard after successfully registering in the portal by providing the essential information.

Registration to Oracle Cloud

Step 2:

By default, it will show the following screen –

Display of root compartment

It would be best if you created the compartment as shown below.

Creation of sub-compartment

Step 3:

Now, you can create the stream, as shown in the next step, by choosing the desired compartment. You need to click – “Create Stream” blue button on top of your page after selecting the desired compartment.

Creation of stream – Initiation
Creation of Stream – Final Steps

Also, you can test the stream by manually uploading a few sample json shown in the below step.

Lower picture show us the sample Json to test the newly created Stream – Upper picture show us the final Stream with some previously tested JSON

Step 4:

Now, we need to add API-key as follows –

Adding the API-Key

This screen will prompt you to download the private key. We’ll use this in the later configuration of our python environment.

Step 5:

Now, you need to capture the content of the configuration file shown in the below figures –

Copying content of the Configuration file

You’ll get these important details under Identity marked in red-box.

Step 6:

Now, you’ll place the previously acquired private key & the content from Step-5 under the following location from where you are going to trigger the application –

Configuration File & Private Key addition

You will get more details on this from these links –

Now, we’ve finished with the basic Oracle cloud setup.


Let’s check the Azure API one more time using postman –

Testing Azure-API from Postman

Let us install some of the critical python packages –

Installing Key Python-packages

Now, we’re ready with our environment.


Let us explore the codebase now.

1. clsConfig.py ( This is the configuration file for this demo-application)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 04-Apr-2020 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Azure 2 OCI API. Application will ####
#### process these information & perform ####
#### the call to our newly developed Azure ####
#### API in OCI. ####
###############################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
"comp": "ocid1.compartment.oc1..xxxxxxxxxxxxxxxxxxxxxxxxyyyyyyyyyyyyyyyyyyxxxxxx",
"URL":"https://xxxxxxxxxx.yyyyyyyyyyyyyyyy.net/api/getDynamicCovidStats&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec":10,
"CACHE":"no-cache",
"colList": "date, state, positive, negative",
"typSel": "Cols",
"LOG_PATH":Curr_Path + sep + 'log' + sep,
"STREAM_NAME":"Covid19-Stream",
"PARTITIONS":1
}

view raw

clsConfig.py

hosted with ❤ by GitHub

2. clsOCIConsume.py (This will consume from the designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 08-Mar-2021 ####
#### ####
#### Objective: Consuming stream from OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIConsume:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_cursor_by_partition(self, client, stream_id, partition):
print("Creating a cursor for partition {}".format(partition))
cursor_details = oci.streaming.models.CreateCursorDetails(
partition=partition,
type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
response = client.create_cursor(stream_id, cursor_details)
cursor = response.data.value
return cursor
def simple_message_loop(self, client, stream_id, initial_cursor):
try:
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=10)
# No messages to process. return.
if not get_response.data:
return
# Process the messages
print(" Read {} messages".format(len(get_response.data)))
for message in get_response.data:
print("{}: {}".format(b64decode(message.key.encode()).decode(),
b64decode(message.value.encode()).decode()))
# get_messages is a throttled method; clients should retrieve sufficiently large message
# batches, as to avoid too many http requests.
time.sleep(1)
# use the next-cursor for iteration
cursor = get_response.headers["opc-next-cursor"]
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def consumeStream(self):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Consuming sream from Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Use a cursor for getting messages; each get_messages call will return a next-cursor for iteration.
# There are a couple kinds of cursors.
# A cursor can be created at a given partition/offset.
# This gives explicit offset management control to the consumer.
print("Starting a simple message loop with a partition cursor")
partition_cursor = self.get_cursor_by_partition(stream_client, s_id, partition="0")
self.simple_message_loop(stream_client, s_id, partition_cursor)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above code –

    def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
        try:

            list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
                                               lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)

            if list_streams.data:
                # If we find an active stream with the correct name, we'll use it.
                print("An active stream {} has been found".format(stream_name))
                sid = list_streams.data[0].id
                return self.get_stream(sac_composite.client, sid)

            print(" No Active stream  {} has been found; Creating it now. ".format(stream_name))
            print(" Creating stream {} with {} partitions.".format(stream_name, partition))

            # Create stream_details object that need to be passed while creating stream.
            stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
                                                                      compartment_id=compartment, retention_in_hours=24)

            # Since stream creation is asynchronous; we need to wait for the stream to become active.
            response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
            return response
        except Exception as e:
            print(str(e))

The above function will check if there is already any existing stream available or not. If not, then it will create one.

    def get_cursor_by_partition(self, client, stream_id, partition):
        print("Creating a cursor for partition {}".format(partition))
        cursor_details = oci.streaming.models.CreateCursorDetails(
            partition=partition,
            type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
        response = client.create_cursor(stream_id, cursor_details)
        cursor = response.data.value
        return cursor

In Oracle cloud, you need to create a cursor to consume the streaming messages. Please refer to the following link for more details.

    def simple_message_loop(self, client, stream_id, initial_cursor):
        try:
            cursor = initial_cursor
            while True:
                get_response = client.get_messages(stream_id, cursor, limit=10)
                # No messages to process. return.
                if not get_response.data:
                    return

                # Process the messages
                print(" Read {} messages".format(len(get_response.data)))
                for message in get_response.data:
                    print("{}: {}".format(b64decode(message.key.encode()).decode(),
                                          b64decode(message.value.encode()).decode()))

                # get_messages is a throttled method; clients should retrieve sufficiently large message
                # batches, as to avoid too many http requests.
                time.sleep(1)
                # use the next-cursor for iteration
                cursor = get_response.headers["opc-next-cursor"]

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In this case, we’re looping through the channel & consuming the messages maintaining fewer HTTP requests in mind.

3. clsOCIPublish.py (This will publish msgs from the source Azure-API to designated stream created in Oracle-cloud)


##############################################
#### Enhancement By: SATYAKI DE ####
#### Enhancement On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Publishing stream at OCI ####
##############################################
import oci
import sys
import time
import os
from base64 import b64encode, b64decode
import json
from clsConfig import clsConfig as cf
from oci.config import from_file
import pandas as p
class clsOCIPublish:
def __init__(self):
self.comp = str(cf.conf['comp'])
self.STREAM_NAME = str(cf.conf['STREAM_NAME'])
self.PARTITIONS = int(cf.conf['PARTITIONS'])
self.limRec = int(cf.conf['limRec'])
def get_stream(self, admin_client, stream_id):
return admin_client.get_stream(stream_id)
def publish_messages(self, client, stream_id, inputDF):
try:
limRec = self.limRec
# Converting dataframe to json
df = inputDF
# Calculating total number of records
cntRow = df.shape[0]
print('Actual Record counts: ', str(cntRow))
print('Supplied Record counts: ', str(limRec))
# Build up a PutMessagesDetails and publish some messages to the stream
message_list = []
start_pos = 0
end_pos = 0
interval = 1
for i in range(limRec):
split_df = p.DataFrame()
rJson = ''
# Preparing Data
# Getting Individual Element & convert them to Series
if ((start_pos + interval) <= cntRow):
end_pos = start_pos + interval
else:
end_pos = start_pos + (cntRow start_pos)
split_df = df.iloc[start_pos:end_pos]
rJson = split_df.to_json(orient ='records')
if ((start_pos > cntRow) | (start_pos == cntRow)):
pass
else:
start_pos = start_pos + interval
key = "key" + str(i)
value = "value" + str(rJson)
# End of data preparation
encoded_key = b64encode(key.encode()).decode()
encoded_value = b64encode(value.encode()).decode()
message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))
print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
put_message_result = client.put_messages(stream_id, messages)
# The put_message_result can contain some useful metadata for handling failures
for entry in put_message_result.data.entries:
if entry.error:
print("Error ({}) : {}".format(entry.error, entry.error_message))
else:
print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def get_or_create_stream(self, client, compartment_id, stream_name, partition, sac_composite):
try:
list_streams = client.list_streams(compartment_id=compartment_id, name=stream_name,
lifecycle_state=oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE)
if list_streams.data:
# If we find an active stream with the correct name, we'll use it.
print("An active stream {} has been found".format(stream_name))
sid = list_streams.data[0].id
return self.get_stream(sac_composite.client, sid)
print(" No Active stream {} has been found; Creating it now. ".format(stream_name))
print(" Creating stream {} with {} partitions.".format(stream_name, partition))
# Create stream_details object that need to be passed while creating stream.
stream_details = oci.streaming.models.CreateStreamDetails(name=stream_name, partitions=partition,
compartment_id=compartment, retention_in_hours=24)
# Since stream creation is asynchronous; we need to wait for the stream to become active.
response = sac_composite.create_stream_and_wait_for_state(stream_details, wait_for_states=[oci.streaming.models.StreamSummary.LIFECYCLE_STATE_ACTIVE])
return response
except Exception as e:
print(str(e))
def publishStream(self, inputDf):
try:
STREAM_NAME = self.STREAM_NAME
PARTITIONS = self.PARTITIONS
compartment = self.comp
print('Publishing sream to Oracle Cloud!')
# Load the default configuration
config = from_file(file_location="~/.oci/config.poc")
# Create a StreamAdminClientCompositeOperations for composite operations.
stream_admin_client = oci.streaming.StreamAdminClient(config)
stream_admin_client_composite = oci.streaming.StreamAdminClientCompositeOperations(stream_admin_client)
# We will reuse a stream if its already created.
# This will utilize list_streams() to determine if a stream exists and return it, or create a new one.
stream = self.get_or_create_stream(stream_admin_client, compartment, STREAM_NAME,
PARTITIONS, stream_admin_client_composite).data
print(" Created Stream {} with id : {}".format(stream.name, stream.id))
# Streams are assigned a specific endpoint url based on where they are provisioned.
# Create a stream client using the provided message endpoint.
stream_client = oci.streaming.StreamClient(config, service_endpoint=stream.messages_endpoint)
s_id = stream.id
# Publish some messages to the stream
self.publish_messages(stream_client, s_id, inputDf)
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

Let’s explore the key snippet from the above script –

    def publish_messages(self, client, stream_id, inputDF):
        try:
            limRec = self.limRec
            # Converting dataframe to json
            df = inputDF

            # Calculating total number of records
            cntRow = df.shape[0]
            print('Actual Record counts: ', str(cntRow))
            print('Supplied Record counts: ', str(limRec))

            # Build up a PutMessagesDetails and publish some messages to the stream
            message_list = []
            start_pos = 0
            end_pos = 0
            interval = 1

            for i in range(limRec):
                split_df = p.DataFrame()
                rJson = ''
                # Preparing Data

                # Getting Individual Element & convert them to Series
                if ((start_pos + interval) <= cntRow):
                    end_pos = start_pos + interval
                else:
                    end_pos = start_pos + (cntRow - start_pos)

                split_df = df.iloc[start_pos:end_pos]
                rJson = split_df.to_json(orient ='records')

                if ((start_pos > cntRow) | (start_pos == cntRow)):
                    pass
                else:
                    start_pos = start_pos + interval

                key = "key" + str(i)
                value = "value" + str(rJson)

                # End of data preparation

                encoded_key = b64encode(key.encode()).decode()
                encoded_value = b64encode(value.encode()).decode()
                message_list.append(oci.streaming.models.PutMessagesDetailsEntry(key=encoded_key, value=encoded_value))

            print("Publishing {} messages to the stream {} ".format(len(message_list), stream_id))
            messages = oci.streaming.models.PutMessagesDetails(messages=message_list)
            put_message_result = client.put_messages(stream_id, messages)

            # The put_message_result can contain some useful metadata for handling failures
            for entry in put_message_result.data.entries:
                if entry.error:
                    print("Error ({}) : {}".format(entry.error, entry.error_message))
                else:
                    print("Published message to partition {} , offset {}".format(entry.partition, entry.offset))

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            return 1

In the above snippet, we’re fetching data captured from our Azure-API call & then send chunk-by-chunk data to the Oracle stream for publishing.

4. clsAzureAPI.py (This will fetch msgs from the source Azure-API)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 07-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Calling Azure dynamic API ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
class clsAzureAPI:
def __init__(self):
self.url = cf.conf['URL']
self.azure_cache = cf.conf['CACHE']
self.azure_con = cf.conf['conType']
self.type = cf.conf['appType']
self.typSel = cf.conf['typSel']
self.typVal = cf.conf['colList']
def searchQry(self):
try:
url = self.url
api_cache = self.azure_cache
api_con = self.azure_con
type = self.type
typSel = self.typSel
typVal = self.typVal
querystring = {"typeSel": typSel, "typeVal": typVal}
print('Input JSON: ', str(querystring))
headers = {
'content-type': type,
'Cache-Control': api_cache,
'Connection': api_con
}
response = requests.request("GET", url, headers=headers, params=querystring)
ResJson = response.text
jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)
return ResJson
except Exception as e:
ResJson = ''
x = str(e)
print(x)
logging.info(x)
ResJson = {'errorDetails': x}
return ResJson

view raw

clsAzureAPI.py

hosted with ❤ by GitHub

I think this one is pretty straightforward as we’re invoking the Azure-API response.

5. callAzure2OracleStreaming.py (Main calling script to invoke all the class for a end to end test)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Mar-2021 ####
#### Modified On 07-Mar-2021 ####
#### ####
#### Objective: Main calling scripts – ####
#### This Python script will consume an ####
#### source API data from Azure-Cloud & publish the ####
#### data into an Oracle Streaming platform, ####
#### which is compatible with Kafka. Later, another ####
#### consumer app will read the data from the stream.####
#########################################################
from clsConfig import clsConfig as cf
import clsL as cl
import logging
import datetime
import clsAzureAPI as ca
import clsOCIPublish as co
import clsOCIConsume as cc
import pandas as p
import json
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
# Lookup functions from
# Azure cloud SQL DB
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
def main():
try:
# Declared Variable
ret_1 = 0
debug_ind = 'Y'
res_2 = ''
# Defining Generic Log File
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'Azure2OCIStream.log', level=logging.INFO)
# Initiating Log Class
l = cl.clsL()
# Moving previous day log files to archive directory
log_dir = cf.conf['LOG_PATH']
tmpR0 = "*" * 157
logging.info(tmpR0)
tmpR9 = 'Start Time: ' + str(var)
logging.info(tmpR9)
logging.info(tmpR0)
print()
print("Log Directory::", log_dir)
tmpR1 = 'Log Directory::' + log_dir
logging.info(tmpR1)
print('Welcome to Azure to Oracle Cloud Streaming(OCI) Calling Program: ')
print('*' * 160)
print('Reading dynamic Covid data from Azure API: ')
print('https://xxxxxx.yyyyyyyyyy.net/api/getDynamicCovidStats&#39;)
print()
print('Selected Columns for this -> date, state, positive, negative')
print()
print('This will take few seconds depending upon the volume & network!')
print('-' * 160)
print()
# Create the instance of the Mock Mulesoft API Class
x1 = ca.clsAzureAPI()
# Let's pass this to our map section
retJson = x1.searchQry()
# Converting JSon to Pandas Dataframe for better readability
# Capturing the JSON Payload
#res_1 = json.dumps(retJson)
#res = json.loads(res_1)
res = json.loads(retJson)
# Converting dictionary to Pandas Dataframe
# df_ret = p.read_json(ret_2, orient='records')
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[1])
# Removing any duplicate columns
df_ret = df_ret.loc[:, ~df_ret.columns.duplicated()]
print()
print()
print("-" * 160)
print('Publishing Azure sample result: ')
print(df_ret.head())
# Logging Final Output
l.logr('1.df_ret' + var + '.csv', debug_ind, df_ret, 'log')
print("-" * 160)
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Publisher Program!')
print('Pushing Azure API to Oracle Kafka-Streaming using OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x2 = co.clsOCIPublish()
retVal = x2.publishStream(df_ret)
if retVal == 0:
print('Successfully streamed to Oracle Cloud!')
else:
print('Failed to stream!')
print()
print('*' * 160)
print('Calling Oracle Cloud Infrustructure Consumer Program!')
print('Getting Oracle Streaming captured in OCI!')
print('-' * 160)
# Create the instance of the Mock Mulesoft API Class
x3 = cc.clsOCIConsume()
retVal2 = x3.consumeStream()
if retVal2 == 0:
print('Successfully streamed captured from Oracle Cloud!')
else:
print('Failed to retrieve stream from OCI!')
print('Finished Analysis points..')
print("*" * 160)
logging.info('Finished Analysis points..')
logging.info(tmpR0)
tmpR10 = 'End Time: ' + str(var)
logging.info(tmpR10)
logging.info(tmpR0)
except ValueError as e:
print(str(e))
print("Invalid option!")
logging.info("Invalid option!")
except Exception as e:
print("Top level Error: args:{0}, message{1}".format(e.args, e.message))
if __name__ == "__main__":
main()

This one is the primary calling script, invoking all the Python classes one-by-one to run our test cases together.


Let us run our application –

Running end to end application

And, you can see the streaming data inside Oracle cloud as shown below –

Streaming data

You can explore my Git associated with this project & download the code from here.

So, finally, we’ve done it. 😀


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀

Note: All the data & scenarios posted here are representational data & scenarios that are available over the internet & for educational purpose only. Also, I’ve used template SDK provided by Oracle & customized it to satisfy our business cases.