Hi Guys,
Here is the latest installment from the Python verse. For the first time, we’ll be dealing with Python with Azure cloud along with the help from Pandas & json.
Why post on this topic?
I always try to post something based on some kind of used cases, which might be useful in real-life scenarios. And, on top of that, I really don’t find significant posts on Azure dealing with Python. So, thought of sharing some first used cases, which will encourage others to join this club & used more python based application in the Azure platform.
First, let us check the complexity of today’s post & our objective.
What is the objective?
Today, our objective is to load a couple of json payload & stored them into multiple Cosmos Containers & finally fetch the data from the Cosmos DB & store the output into our log files apart from printing the same over the terminal screen.
Before we start discussing our post, let us explain some basic terminology of Azure Cosmos DB. So, that, next time whenever we refer them, it will be easier for you to understand those terminologies.
Learning basic azure terminology.
Since this is an unstructured DB, all the data will be stored in this following fashion –
Azure Cosmos DB -> Container -> Items
Let’s simplify this in words. So, each azure DB may have multiple containers, which you can compare with the table of any conventional RDBMS. And, under containers, you will have multiple items, which represents rows of an RDBMS table. The only difference is in each item you might have a different number of elements, which is equivalent to the columns in traditional RDBMS tables. The traditional table always has a fixed number of columns.
Input Payload:
Let’s review three different payloads, which we’ll be loading into three separate containers.
srcEmail.json

As you can see in the items, first sub-row has 3 elements, whereas the second one has 4 components. Traditional RDBMS, the table will always have the same number of columns.
srcTwitter.json

srcHR.json

So, from the above three sample payload, our application will try to put user’s feedback & consolidate at a single place for better product forecasts.
Azure Portal:
Let’s look into the Azure portal & we’ll be identifying a couple of crucial information, which will require in python scripts for authentication. But, before that, I’ll show – how to get those details in steps –

As shown highlighted in Red, click the Azure Cosmos DB. You will find the following screen –

If you click this, you will find all the collections/containers that are part of the same DB as follows –

After, that we’ll be trying to extract the COSMOS Key & the Endpoint/URI from the portal. Without this, python application won’t be able to interact with the Azure portal. This is sensitive information. So, I’ll be providing some dummy details here just to show how to extract it. Never share these details with anyone outside of your project or group.

Good. Now, we’re ready for python scripts.
Python Scripts:
In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –
- clsL.py
So, I’m not going to discuss these scripts.
Before we discuss our scripts, let’s look out the directory structures –

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
############################################## #### Written By: SATYAKI DE #### #### Written On: 25-May-2019 #### #### #### #### Objective: This script is a config #### #### file, contains all the keys for #### #### azure cosmos db. Application will #### #### process these information & perform #### #### various CRUD operation on Cosmos DB. #### ############################################## import os import platform as pl class clsConfig(object): Curr_Path = os.path.dirname(os.path.realpath(__file__)) db_name = 'rnd-de01-usw2-vfa-cdb' db_link = 'dbs/' + db_name CONTAINER1 = "RealtimeEmail" CONTAINER2 = "RealtimeTwitterFeedback" CONTAINER3 = "RealtimeHR" os_det = pl.system() if os_det == "Windows": sep = '\\' else: sep = '/' config = { 'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json', 'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json', 'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json', 'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/', 'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==", 'ARCH_DIR': Curr_Path + sep + 'arch' + sep, 'COSMOSDB': db_name, 'COSMOS_CONTAINER1': CONTAINER1, 'COSMOS_CONTAINER2': CONTAINER2, 'COSMOS_CONTAINER3': CONTAINER3, 'CONFIG_ORIG': 'Config_orig.csv', 'ENCRYPT_CSV': 'Encrypt_Config.csv', 'DECRYPT_CSV': 'Decrypt_Config.csv', 'PROFILE_PATH': Curr_Path + sep + 'profile' + sep, 'LOG_PATH': Curr_Path + sep + 'log' + sep, 'REPORT_PATH': Curr_Path + sep + 'report', 'APP_DESC_1': 'Feedback Communication', 'DEBUG_IND': 'N', 'INIT_PATH': Curr_Path, 'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty FROM RealtimeEmail c", 'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal", 'DB_QRY': "SELECT * FROM c", 'COLLECTION_QRY': "SELECT * FROM r", 'database_link': db_link, 'collection_link_1': db_link + '/colls/' + CONTAINER1, 'collection_link_2': db_link + '/colls/' + CONTAINER2, 'collection_link_3': db_link + '/colls/' + CONTAINER3, 'options': { 'offerThroughput': 1000, 'enableCrossPartitionQuery': True, 'maxItemCount': 2 } }
2. clsCosmosDBDet (This script will test the necessary connection with the Azure cosmos DB from the python application. And, if it is successful, then it will fetch all the collection/containers details, which resided under the same DB. Hence, the name comes into the picture.)
############################################## #### Written By: SATYAKI DE #### #### Written On: 25-May-2019 #### #### #### #### Objective: This script will check & #### #### test the connection with the Cosmos #### #### & it will fetch all the collection #### #### name resied under the same DB. #### ############################################## import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.errors as errors from clsConfig import clsConfig as cf class IDisposable(cosmos_client.CosmosClient): def __init__(self, obj): self.obj = obj def __enter__(self): return self.obj def __exit__(self, exception_type, exception_val, trace): self = None class clsCosmosDBDet: def __init__(self): self.endpoint = cf.config['COSMOSDB_ENDPOINT'] self.primarykey = cf.config['COSMOS_PRIMARYKEY'] self.db = cf.config['COSMOSDB'] self.cont_1 = cf.config['COSMOS_CONTAINER1'] self.cont_2 = cf.config['COSMOS_CONTAINER2'] self.cont_3 = cf.config['COSMOS_CONTAINER3'] self.database_link = cf.config['database_link'] self.collection_link_1 = cf.config['collection_link_1'] self.collection_link_2 = cf.config['collection_link_2'] self.collection_link_3 = cf.config['collection_link_3'] self.options = cf.config['options'] self.db_qry = cf.config['DB_QRY'] self.collection_qry = cf.config['COLLECTION_QRY'] def list_Containers(self, client): try: database_link = self.database_link collection_qry = self.collection_qry print("1. Query for collection!") print() collections = list(client.QueryContainers(database_link, {"query": collection_qry})) if not collections: return for collection in collections: print(collection['id']) print() except errors.HTTPFailure as e: if e.status_code == 404: print("*" * 157) print('A collection with id \'{0}\' does not exist'.format(id)) print("*" * 157) else: raise errors.HTTPFailure(e.status_code) def test_db_con(self): endpoint = self.endpoint primarykey = self.primarykey options_1 = self.options db_qry = self.db_qry with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client: try: try: options = {} query = {"query": db_qry} options = options_1 print("-" * 157) print('Options:: ', options) print() print("Database details:: ") result_iterable = client.QueryDatabases(query, options) for item in iter(result_iterable): print(item) print("-" * 157) except errors.HTTPFailure as e: if e.status_code == 409: pass else: raise errors.HTTPFailure(e.status_code) self.list_Containers(client) return 0 except errors.HTTPFailure as e: print("Application has caught an error. {0}".format(e.message)) return 1 finally: print("Application successfully completed!")
Key lines from the above scripts are –
with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client:
In this step, the python application is building the connection object.
# Refer the entry in our config file self.db_qry = cf.config['DB_QRY'] .. query = {"query": db_qry} options = options_1 ..result_iterable = client.QueryDatabases(query, options)
Based on the supplied value from our configuration python script, this will extract the cosmos DB information.
self.list_Containers(client)
This is a function that will identify all the collection under this DB.
def list_Containers(self, client):
.. collections = list(client.QueryContainers(database_link, {"query": collection_qry})) if not collections: return for collection in collections: print(collection['id'])
In these above lines, our application will actually fetch the containers that are associated with this DB.
3. clsColMgmt.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)
################################################ #### Written By: SATYAKI DE #### #### Written On: 25-May-2019 #### #### #### #### Objective: This scripts has multiple #### #### features. You can create new items #### #### in azure cosmos db. Apart from that #### #### you can retrieve data from Cosmos just #### #### for viewing purpose. You can display #### #### data based on specific filters or the #### #### entire dataset. Hence, three different #### #### methods provided here to support this. #### ################################################ import azure.cosmos.cosmos_client as cosmos_client import azure.cosmos.errors as errors import pandas as p import json from clsConfig import clsConfig as cf class IDisposable(cosmos_client.CosmosClient): def __init__(self, obj): self.obj = obj def __enter__(self): return self.obj def __exit__(self, exception_type, exception_val, trace): self = None class clsColMgmt: def __init__(self): self.endpoint = cf.config['COSMOSDB_ENDPOINT'] self.primarykey = cf.config['COSMOS_PRIMARYKEY'] self.db = cf.config['COSMOSDB'] self.cont_1 = cf.config['COSMOS_CONTAINER1'] self.cont_2 = cf.config['COSMOS_CONTAINER2'] self.cont_3 = cf.config['COSMOS_CONTAINER3'] self.database_link = cf.config['database_link'] self.collection_link_1 = cf.config['collection_link_1'] self.collection_link_2 = cf.config['collection_link_2'] self.collection_link_3 = cf.config['collection_link_3'] self.options = cf.config['options'] self.db_qry = cf.config['DB_QRY'] self.collection_qry = cf.config['COLLECTION_QRY'] # Creating cosmos items in container def CreateDocuments(self, inputJson, collection_flg = 1): try: # Declaring variable endpoint = self.endpoint primarykey = self.primarykey print('Creating Documents') with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client: try: if collection_flg == 1: collection_link = self.collection_link_1 elif collection_flg == 2: collection_link = self.collection_link_2 else: collection_link = self.collection_link_3 container = client.ReadContainer(collection_link) # Create a SalesOrder object. This object has nested properties and various types including numbers, DateTimes and strings. # This can be saved as JSON as is without converting into rows/columns. print('Input Json:: ', str(inputJson)) nSon = json.dumps(inputJson) json_rec = json.loads(nSon) client.CreateItem(container['_self'], json_rec) except errors.HTTPFailure as e: print("Application has caught an error. {0}".format(e.status_code)) finally: print("Application successfully completed!") return 0 except Exception as e: x = str(e) print(x) return 1 def CosmosDBCustomQuery_PandasCSVWithParam(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}): try: # Reading data by SQL & convert it ot Pandas Dataframe results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql)) cnt = 0 dfSrc = p.DataFrame() dfRes = p.DataFrame() dfSrc2 = p.DataFrame() json_data = '' for doc in results: cnt += 1 dfSrc = p.io.json.json_normalize(results) dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1]) dfRes = dfSrc print("Total records fetched: ", cnt) print("*" * 157) return dfRes except errors.HTTPFailure as e: Df_Fin = p.DataFrame() if e.status_code == 404: print("*" *157) print("Document doesn't exists") print("*" *157) return Df_Fin elif e.status_code == 400: print("*" * 157) print("Bad request exception occuered: ", e) print("*" *157) return Df_Fin else: return Df_Fin finally: print() def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}): try: # Reading data by SQL & convert it ot Pandas Dataframe results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql)) cnt = 0 dfSrc = p.DataFrame() dfRes = p.DataFrame() dfSrc2 = p.DataFrame() json_data = '' for doc in results: cnt += 1 dfSrc = p.io.json.json_normalize(results) dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1]) dfRes = dfSrc print("Total records fetched: ", cnt) print("*" * 157) return dfRes except errors.HTTPFailure as e: Df_Fin = p.DataFrame() if e.status_code == 404: print("*" *157) print("Document doesn't exists") print("*" *157) return Df_Fin elif e.status_code == 400: print("*" * 157) print("Bad request exception occuered: ", e) print("*" *157) return Df_Fin else: return Df_Fin finally: print() def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]): endpoint = self.endpoint primarykey = self.primarykey options_1 = self.options with IDisposable(cosmos_client.CosmosClient(url_connection=endpoint, auth={'masterKey': primarykey})) as client: try: if collection_flg == 1: collection_link = self.collection_link_1 elif collection_flg == 2: collection_link = self.collection_link_2 else: collection_link = self.collection_link_3 print("Additional parameters: ", additional_params) message = msg options = options_1 if additional_params == 1: query = {"query": sql_qry} df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options) else: query = {"query": sql_qry, "parameters": param_det} df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options) return df_Fin except errors.HTTPFailure as e: print("Application has caught an error. {0}".format(e.message)) finally: print("Application successfully completed!")
Key lines from the above script –
def CosmosDBCustomQuery_PandasCSV(self, client, collection_link, query_with_optional_parameters, message="Documents found by query: ", options_sql = {}):
This method is generic. It will fetch all the records of a cosmos container.
results = list(client.QueryItems(collection_link, query_with_optional_parameters, options_sql)) .. for doc in results: cnt += 1 dfSrc = p.io.json.json_normalize(results) dfSrc.columns = dfSrc.columns.map(lambda x: x.split(".")[-1]) dfRes = dfSrc
In this step, the application fetching the data in the form of json & then serialize them & flatten them & finally stored the result into pandas dataframe for return output. Function –
CosmosDBCustomQuery_PandasCSVWithParam
– Is the same as the previous function. The only thing it can process parameters to filter out the data.
def fetch_data(self, sql_qry, msg="", collection_flg = 1, additional_params = 1, param_det=[]):
This is the primary calling function. Let us find out the key lines –
if collection_flg == 1: collection_link = self.collection_link_1 elif collection_flg == 2: collection_link = self.collection_link_2 else: collection_link = self.collection_link_3
Based on the supplied collection_flag from the main scripts, our application is identifying the collection where we need to process/load our data.
if additional_params == 1: query = {"query": sql_qry} df_Fin = self.CosmosDBCustomQuery_PandasCSV(client, collection_link, query, message, options) else: query = {"query": sql_qry, "parameters": param_det} df_Fin = self.CosmosDBCustomQuery_PandasCSVWithParam(client, collection_link, query, message, options)
Based on the supplied additiona_params value, python application process, the filter queries & based on that it will invoke the function.
def CreateDocuments(self, inputJson, collection_flg = 1):
This is the primary collection for creating items/rows.
if collection_flg == 1: collection_link = self.collection_link_1 elif collection_flg == 2: collection_link = self.collection_link_2 else: collection_link = self.collection_link_3 container = client.ReadContainer(collection_link)
Based on the collection, our application will points to a specific container & create a connection between python & itself.
nSon = json.dumps(inputJson) json_rec = json.loads(nSon) client.CreateItem(container['_self'], json_rec)
Once, you’ll receive the input payload. The application will convert it to valid JSON payload & then send it to create item method to insert records.
4. callCosmosAPI.py (This script is the main calling function. Hence, the name comes into the picture.)
############################################## #### Written By: SATYAKI DE #### #### Written On: 25-May-2019 #### #### #### #### Objective: Main calling scripts. #### ############################################## import clsColMgmt as cm import clsCosmosDBDet as cmdb from clsConfig import clsConfig as cf import pandas as p import clsL as cl import logging import datetime import json # Disbling Warning def warn(*args, **kwargs): pass import warnings warnings.warn = warn # Lookup functions from # Azure cloud SQL DB def main(): try: df_ret = p.DataFrame() df_ret_2 = p.DataFrame() df_ret_2_Mod = p.DataFrame() debug_ind = 'Y' # Initiating Log Class l = cl.clsL() general_log_path = str(cf.config['LOG_PATH']) # Enabling Logging Info logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO) # Moving previous day log files to archive directory arch_dir = cf.config['ARCH_DIR'] log_dir = cf.config['LOG_PATH'] print("Archive Directory:: ", arch_dir) print("Log Directory::", log_dir) print("*" * 157) print("Testing COSMOS DB Connection!") print("*" * 157) # Checking Cosmos DB Azure y = cmdb.clsCosmosDBDet() ret_val = y.test_db_con() if ret_val == 0: print() print("Cosmos DB Connection Successful!") print("*" * 157) else: print() print("Cosmos DB Connection Failure!") print("*" * 157) raise Exception print("*" * 157) # Creating Data in Cosmos DB print() print('Fetching data from Json!') print('Creating data for Email..') print("-" * 157) emailFile = cf.config['EMAIL_SRC_JSON_FILE'] flg = 1 with open(emailFile) as json_file: dataEmail = json.load(json_file) # Creating documents a1 = cm.clsColMgmt() ret_cr_val1 = a1.CreateDocuments(dataEmail, flg) if ret_cr_val1 == 0: print('Successful data creation!') else: print('Failed create data!') print("-" * 157) print() print('Creating data for Twitter..') print("-" * 157) twitFile = cf.config['TWITTER_SRC_JSON_FILE'] flg = 2 with open(twitFile) as json_file: dataTwitter = json.load(json_file) # Creating documents a2 = cm.clsColMgmt() ret_cr_val2 = a2.CreateDocuments(dataTwitter, flg) if ret_cr_val2 == 0: print('Successful data creation!') else: print('Failed create data!') print("-" * 157) print() print('Creating data for HR..') print("-" * 157) hrFile = cf.config['HR_SRC_JSON_FILE'] flg = 3 with open(hrFile) as json_file: hrTwitter = json.load(json_file) # Creating documents a3 = cm.clsColMgmt() ret_cr_val3 = a3.CreateDocuments(hrTwitter, flg) if ret_cr_val3 == 0: print('Successful data creation!') else: print('Failed create data!') print("-" * 157) # Calling the function 1 print("RealtimeEmail::") # Fetching First collection data to dataframe print("Fethcing Comos Collection Data!") sql_qry_1 = cf.config['SQL_QRY_1'] msg = "Documents generatd based on unique key" collection_flg = 1 x = cm.clsColMgmt() df_ret = x.fetch_data(sql_qry_1, msg, collection_flg) l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log') print('RealtimeEmail Data::') print(df_ret) print() # Checking execution status ret_val = int(df_ret.shape[0]) if ret_val == 0: print("Cosmos DB Hans't returned any rows. Please check your queries!") print("*" * 157) else: print("Successfully fetched!") print("*" * 157) # Calling the 2nd Collection print("RealtimeTwitterFeedback::") # Fetching First collection data to dataframe print("Fethcing Cosmos Collection Data!") # Query using parameters sql_qry_2 = cf.config['SQL_QRY_2'] msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!" collection_flg = 2 val = 'crazyGo' param_det = [{"name": "@CrVal", "value": val}] add_param = 2 x1 = cm.clsColMgmt() df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det) l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log') print('Realtime Twitter Data:: ') print(df_ret_2) print() # Checking execution status ret_val_2 = int(df_ret_2.shape[0]) if ret_val_2 == 0: print("Cosmos DB hasn't returned any rows. Please check your queries!") print("*" * 157) else: print("Successfuly row feteched!") print("*" * 157) except ValueError: print("No relevant data to proceed!") except Exception as e: print("Top level Error: args:{0}, message{1}".format(e.args, e.message)) if __name__ == "__main__": main()
Key lines from the above script –
with open(twitFile) as json_file: dataTwitter = json.load(json_file)
Reading a json file.
val = 'crazyGo' param_det = [{"name": "@CrVal", "value": val}] add_param = 2
Passing a specific parameter value to filter out the record, while fetching it from the Cosmos DB.
Now, let’s look at the runtime stats.
Windows:


MAC:


Let’s compare the output log directory –
Windows:

MAC:

Let’s verify the data from Cosmos DB.

Here, subscriberId starting with ‘M‘ denotes data inserted from the MAC environment. Other one inserted through Windows.
Let’s see one more example from Cosmos –

So, I guess – we’ve achieved our final goal here. Successfully, inserted data into Azure Cosmos DB from the python application & retrieve it successfully.
Following python packages are required in order to run this application –
pip install azure
pip install azure-cosmos
pip install pandas
pip install requests
This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.
I hope you’ll like this effort.
Wait for the next installment. Till then, Happy Avenging. 😀
[Note: All the sample data are available/prepared in the public domain for research & study.]
One thought on “Building Azure Cosmos solution using Python, Pandas ( A crossover of space stone, a reality stone, soul stone & time stone)”