Building Azure Databricks Cluster installing desired packages & with a demo run (Time stone from Python Verse)

Today, I’ll be showing how to prepare a cluster in Azure Databricks from command prompt & will demonstrate any sample csv file process using Pyspark. This can be useful, especially when you want to customize your environment & need to install specific packages inside the clusters with more options.

This is not like any of my earlier posts, where my primary attention is on the Python side. At the end of this post, I’ll showcase one use of Pyspark script & how we can execute them inside Azure Data bricks.

Let’s roll the dice!

Step -1:

Type Azure Databricks in your search folder inside the Azure portal.

0. Azure Search

As shown in the red box, you have to click these options. And, it will take the application to new data bricks sign-in page.

Step -2:

Next step would be clicking the “Add” button. For the first time, the application will ask you to create a storage account associated with this brick.

1. Create Storage

After creation, the screen should look like this –

2.5. Azure-Data-Bricks Options

Now, click the Azure command-line & chose bash as your work environment –

2. After Creation

For security reason, I’ve masked the details.

After successful creation, this page should look like this –

3. Azure Databricks

Once, you click the launch workspace, it will take you to this next page –

4. Detailed Bricks

As you can see that, there are no notebook or python scripts there under Recents tab.

Step -3:

Let’s verify it from the command line shell environment.

5. Python-Env

As you can see, by default python version in bricks is 3.5.2.

Step -4:

Now, we’ll prepare one environment by creating a local directory under the cloud.

The directory that we’ll be creating is – “rndBricks.”

6. Creating Directory

Step -5:

Let’s create the virtual environment here –

Using “virtualenv” function, we’ll be creating the virtual environment & it should look like this –

7. Creating Python-VM

As you can see, that – this will create the first python virtual environment along with the pip & wheel, which is essential for your python environment.

After creating the VM, you need to update Azure CLI, which is shown in the next screenshot given below –

8. Installing Databricks CLI in Python-VM

Before you create the cluster, you need to first generate the token, which will be used for the cluster –

9.1. Generating Token

As shown in the above screen, the “red” marked area is our primary interest. The “green” box, which represents the account image that you need to click & then you have to click “User Settings” marked in blue. Once you click that, you can see the “purple” area, where you need to click the Generate new token button in case if you are doing it for the first time.

Now, we’ll be using this newly generated token to configure data bricks are as follows –

9.2. Configuring with Token

Make sure, you need to mention the correct zone, i.e. westus2/westus or any region as per your geography & convenience.

Once, that is done. You can check the cluster list by the following command (In case, if you already created any clusters in your subscription) –

10. Checking Clusters List

Since we’re building it from scratch. There is no cluster information showing here.

Step -6:

Let’s create the clusters –

11. Creating-Clusters-From-Command

Please find the command that you will be using are as follows –

databricks clusters create –json ‘{ “autoscale”: {“min_workers”: 2, “max_workers”: 8}, “cluster_name”: “pyRnd”, “spark_version”: “5.3.x-scala2.11”, “spark_conf”: {}, “node_type_id”: “Standard_DS3_v2”, “driver_node_type_id”: “Standard_DS3_v2”, “ssh_public_keys”: [], “custom_tags”: {}, “spark_env_vars”: {“PYSPARK_PYTHON”: “/databricks/python3/bin/python3”}, “autotermination_minutes”: 20, “enable_elastic_disk”: true, “cluster_source”: “UI”, “init_scripts”: [] }’

As you can see, you need to pass the information in JSON format. For your better understanding, please find the JSON in a proper format –

11.5. JSON

And, the raw version –

{
  "autoscale": {
    "min_workers": 2,
    "max_workers": 8
  },
  "cluster_name": "pyRnd",
  "spark_version": "5.3.x-scala2.11",
  "spark_conf": {},
  "node_type_id": "Standard_DS3_v2",
  "driver_node_type_id": "Standard_DS3_v2",
  "ssh_public_keys": [],
  "custom_tags": {},
  "spark_env_vars": {
    "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
  },
  "autotermination_minutes": 20,
  "enable_elastic_disk": true,
  "cluster_source": "UI",
  "init_scripts": []
}

Initially, the cluster status will show from the GUI are as follows –

12. Cluster-Status-In-Progress

After a few minutes, this will show the running state –

13. Cluster-Running Status

Let’s check the detailed configuration once the cluster created –

14. Initial Cluster Details

Step -7:

We need to check the library section. This is important as we might need to install many dependant python package to run your application on Azure data bricks. And, the initial Libraries will look like this –

15. Libraries

You can install libraries into an existing cluster either through GUI or through shell command prompt as well. Let’s explore the GUI option.

GUI Option:

First, click the Libraries tab under your newly created clusters, as shown in the above picture. Then you need to click “Install New” button. This will pop-up the following windows –

16. Installing Libraries

As you can see, you have many options along with the possibilities for your python (marked in red) application as well.

Case 1 (Installing PyPi packages):

19. Installing through GUI

Note: You can either mention the specific version or just simply name the package name.

Case 2 (Installing Wheel packages):

16.5. Installing Wheel Libraries

As you can see, from the upload options, you can upload your local libraries & then click the install button to install the same.

UI Option:

Here is another way, you can install your python libraries using the command line as shown in the below screenshots –

17. Running & Installing Libraries - Alternate Options

Few things to notice. The first command shows the current running cluster list. Second, command updating your pip packages. And, the third command, install your desired pypi packages.

Please find the raw commands –

databricks clusters list

pip install -U pip

databricks libraries install –cluster-id “XXXX-XXXXX-leech896” –pypi-package “pandas” –pypi-repo “https://pypi.org/project/pandas/”

After installing, the GUI page under the libraries section will look like this –

18. Installed Libraries

Note that, for any failed case, you can check the log in this way –

20. Installation-In-progress

If you click on the marked red area, it will pop-up the detailed error details, which is as follows –

19.5. Error Details

So, we’re done with our initial set-up.

Let’s upload one sample file into this environment & try to parse the data.

Step -8:

You can upload your sample file as follows –

23.1. First Step

First, click the “data” & then click the “add data” marked in the red box.

You can import this entire csv data as tables as shown in the next screenshot –

23.2. Uploading Data Files

Also, you can create a local directory here based on your requirements are explained as –

24. Creating Local Directory For Process

Step -9:

Let’s run the code.

Please find the following snippet in PySpark for our test –

1. DBFromFile.py (This script will call the Bricks script & process the data to create an SQL like a table for our task.)

###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 10-Feb-2019       ########
####                               ########
#### Objective: Pyspark File to    ########
#### parse the uploaded csv file.  ########
###########################################

# File location and type
file_location = "/FileStore/tables/src_file/customer_addr_20180112.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

# Create a view or table

temp_table_name = "customer_addr_20180112_csv"

df.createOrReplaceTempView(temp_table_name)

%sql

/* Query the created temp table in a SQL cell */

select * from `customer_addr_20180112_csv`

From the above sample snippet, one can see that the application is trying to parse the source data by providing all the parsing details & then use that csv as a table in SQL.

Let’s check step by step execution.

25. Working With Uploaded File

So, until this step, you can see that the application has successfully parsed the csv data.

And, finally, you can view the data –

25.1. Second Option

As the highlighted blue box shows that the application is using this csv file as a table. So, you have many options to analyze the information flexibly if you are familiar with SQL.

After your job run, make sure you terminate your cluster. Otherwise, you’ll receive a large & expensive usage bill, which you might not want!

So, finally, we’ve done it.

Let me know what do you think.

Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Combining the NoSQL(Cosmos DB) & traditional Azure RDBMS in Azure (Time stone solo from Python verse)

Hi Guys!

Today, our main objective is to extend our last post & blending two different kinds of data using Python.

Please refer the earlier post if you didn’t go through it – “Building Azure cosmos application.“.

What is the Objective?

In this post, our objective is to combine traditional RDBMS from the cloud with Azure’s NO SQL, which is, in this case, is Cosmos DB. And, try to forecast some kind of blended information, which can be aggregated further.

Examining Source Data.

No SQL Data from Cosmos:

Let’s check one more time the No SQL data created in our last post.

CosmosData

Total, we’ve created 6 records in our last post.

As you can see in red marked areas. From item, one can check the total number of records created. You can also filter out specific record using the Edit Filter blue color button highlighted with blue box & you need to provide the “WHERE CLAUSE” inside it.

Azure SQL DB:

Let’s create some data in Azure SQL DB.

But, before that, you need to create SQL DB in the Azure cloud. Here is the official Microsoft link to create DB in Azure. You can refer to it here.

I won’t discuss the detailed steps of creating DB here.

From Azure portal, it looks like –

Azure SQL DB Main Screen

Let’s see how the data looks like in Azure DB. For our case, we’ll be using the hrMaster DB.

Let’s create the table & some sample data aligned as per our cosmos data.

Azure SQL DB

We will join both the data based on subscriberId & then extract our required columns in our final output.

CombinedData

Good. Now, we’re ready for python scripts.

Python Scripts:

In this installment, we’ll be reusing the following python scripts, which is already discussed in my earlier post –

  • clsL.py
  • clsColMgmt.py
  • clsCosmosDBDet.py

So, I’m not going to discuss these scripts.

Before we discuss our scripts, let’s look out the directory structures –

Win_Vs_MAC

Here is the detailed directory structure between the Windows & MAC O/S.

1. clsConfig.py (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Updated On: 02-Jun-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))
    db_name = 'rnd-de01-usw2-vfa-cdb'
    db_link = 'dbs/' + db_name
    CONTAINER1 = "RealtimeEmail"
    CONTAINER2 = "RealtimeTwitterFeedback"
    CONTAINER3 = "RealtimeHR"

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'SERVER': 'xxxx-xxx.database.windows.net',
        'DATABASE_1': 'SalesForceMaster',
        'DATABASE_2': 'hrMaster',
        'DATABASE_3': 'statMaster',
        'USERNAME': 'admin_poc_dev',
        'PASSWORD': 'xxxxx',
        'DRIVER': '{ODBC Driver 17 for SQL Server}',
        'ENV': 'pocdev-saty',
        'ENCRYPT_FLAG': "yes",
        'TRUST_FLAG': "no",
        'TIMEOUT_LIMIT': "30",
        'PROCSTAT': "'Y'",
        'APP_ID': 1,
        'EMAIL_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcEmail.json',
        'TWITTER_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcTwitter.json',
        'HR_SRC_JSON_FILE': Curr_Path + sep + 'src_file' + sep + 'srcHR.json',
        'COSMOSDB_ENDPOINT': 'https://rnd-de01-usw2-vfa-cdb.documents.azure.com:443/',
        'CONFIG_TABLE': 'ETL_CONFIG_TAB',
        'COSMOS_PRIMARYKEY': "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXIsI00AxKXXXXXgg==",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'COSMOSDB': db_name,
        'COSMOS_CONTAINER1': CONTAINER1,
        'COSMOS_CONTAINER2': CONTAINER2,
        'COSMOS_CONTAINER3': CONTAINER3,
        'CONFIG_ORIG': 'Config_orig.csv',
        'ENCRYPT_CSV': 'Encrypt_Config.csv',
        'DECRYPT_CSV': 'Decrypt_Config.csv',
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Feedback Communication',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'SQL_QRY_1': "SELECT c.subscriberId, c.sender, c.orderNo, c.orderDate, c.items.orderQty  FROM RealtimeEmail c",
        'SQL_QRY_2': "SELECT c.twitterId, c.Twit, c.DateCreated, c.Country FROM RealtimeTwitterFeedback c WHERE c.twitterId=@CrVal",
        'DB_QRY': "SELECT * FROM c",
        'AZURE_SQL_1': "SELECT DISTINCT subscriberId, state, country, annualIncome, customerType FROM dbo.onboardCustomer",
        'COLLECTION_QRY': "SELECT * FROM r",
        'database_link': db_link,
        'collection_link_1': db_link + '/colls/' + CONTAINER1,
        'collection_link_2': db_link + '/colls/' + CONTAINER2,
        'collection_link_3': db_link + '/colls/' + CONTAINER3,
        'options': {
            'offerThroughput': 1000,
            'enableCrossPartitionQuery': True,
            'maxItemCount': 2
        }
    }

Here, we’ve added a couple of more entries compared to the last time, which points the detailed configuration for Azure SQL DB.

‘SERVER’: ‘xxxx-xxx.database.windows.net’,
‘DATABASE_1’: ‘SalesForceMaster’,
‘DATABASE_2’: ‘hrMaster’,
‘DATABASE_3’: ‘statMaster’,
‘USERNAME’: ‘admin_poc_dev’,
‘PASSWORD’: ‘xxxxx’,
‘DRIVER’: ‘{ODBC Driver 17 for SQL Server}’,
‘ENV’: ‘pocdev-saty’,
‘ENCRYPT_FLAG’: “yes”,
‘TRUST_FLAG’: “no”,
‘TIMEOUT_LIMIT’: “30”,
‘PROCSTAT’: “‘Y'”, 

Here, you need to supply your DB credentials accordingly.

2. clsDBLookup.py (This script will look into the Azure SQL DB & fetch data from the traditional RDBMS of Azure environment.)

#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 25-May-2019                     ####
####                                             ####
#### Objective: This script will check &         ####
#### test the connection with the Azure          ####
#### SQL DB & it will fetch all the records      ####
#### name resied under the same DB of a table.   ####
#####################################################

import pyodbc as py
import pandas as p
from clsConfig import clsConfig as cdc

class clsDBLookup(object):
    def __init__(self, lkpTableName = ''):
        self.server = cdc.config['SERVER']
        self.database = cdc.config['DATABASE_1']
        self.database1 = cdc.config['DATABASE_2']
        self.database2 = cdc.config['DATABASE_3']
        self.username = cdc.config['USERNAME']
        self.password = cdc.config['PASSWORD']
        self.driver = cdc.config['DRIVER']
        self.env = cdc.config['ENV']
        self.encrypt_flg = cdc.config['ENCRYPT_FLAG']
        self.trust_flg = cdc.config['TRUST_FLAG']
        self.timeout_limit = cdc.config['TIMEOUT_LIMIT']
        self.lkpTableName = cdc.config['CONFIG_TABLE']
        self.ProcStat = cdc.config['PROCSTAT']
        self.AppId = cdc.config['APP_ID']

    def LookUpData(self):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            db_con_azure = py.connect(str_conn)

            query = " SELECT [ruleId] as ruleId, [ruleName] as ruleName, [ruleSQL] as ruleSQL, " \
                    " [ruleFlag] as ruleFlag, [appId] as appId, [DBType] as DBType, " \
                    " [DBName] as DBName FROM [dbo][" + lkpTableName + "] WHERE ruleFLag = " + ProcStat + " " \
                    " and appId = " + AppId + " ORDER BY ruleId "

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

    def azure_sqldb_read(self, sql):
        try:
            # Assigning all the required values
            server = self.server
            database = self.database1
            username = self.username
            password = self.password
            driver = self.driver
            env = self.env
            encrypt_flg = self.encrypt_flg
            trust_flg = self.trust_flg
            timout_limit = self.timeout_limit
            lkpTableName = self.lkpTableName
            ProcStat = self.ProcStat
            AppId = self.AppId

            # Creating secure connection
            str_conn = 'Driver=' + driver + ';Server=tcp:' + server + ',1433;' \
                       'Database=' + database + ';Uid=' + username + '@' + env + ';' \
                       'Pwd=' + password + ';Encrypt=' + encrypt_flg + ';' \
                       'TrustServerCertificate=' + trust_flg + ';Connection Timeout=' + timout_limit + ';'

            # print("Connection Details:: ", str_conn)
            db_con_azure = py.connect(str_conn)

            query = sql

            df = p.read_sql(query, db_con_azure)

            # Closing the connection
            db_con_azure.close()

            return df
        except Exception as e:
            x = str(e)
            print(x)
            df = p.DataFrame()

            return df

Major lines to discuss –

azure_sqldb_read(self, sql):

Getting the source SQL supplied from the configuration script.

db_con_azure = py.connect(str_conn)

query = sql

df = p.read_sql(query, db_con_azure)

After creating a successful connection, our application will read the SQL & fetch the data & store that into a pandas dataframe and return the output to the primary calling function.

3. callCosmosAPI.py (This is the main script, which will call all the methods to blend the data. Hence, the name comes into the picture.)

##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 25-May-2019              ####
#### Modified On 02-Jun-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

import clsColMgmt as cm
import clsCosmosDBDet as cmdb
from clsConfig import clsConfig as cf
import pandas as p
import clsLog as cl
import logging
import datetime
import json
import clsDBLookup as dbcon

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def main():
    try:
        df_ret = p.DataFrame()
        df_ret_2 = p.DataFrame()
        df_ret_2_Mod = p.DataFrame()

        debug_ind = 'Y'

        # Initiating Log Class
        l = cl.clsLog()

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidated.log', level=logging.INFO)

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)

        print("*" * 157)
        print("Testing COSMOS DB Connection!")
        print("*" * 157)

        # Checking Cosmos DB Azure
        y = cmdb.clsCosmosDBDet()
        ret_val = y.test_db_con()

        if ret_val == 0:
            print()
            print("Cosmos DB Connection Successful!")
            print("*" * 157)
        else:
            print()
            print("Cosmos DB Connection Failure!")
            print("*" * 157)
            raise Exception

        print("*" * 157)

        # Accessing from Azure SQL DB
        x1 = dbcon.clsDBLookup()
        act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

        print("Azure SQL DB::")
        print(act_df)
        print()

        print("-" * 157)

        # Calling the function 1
        print("RealtimeEmail::")

        # Fetching First collection data to dataframe
        print("Fethcing Comos Collection Data!")

        sql_qry_1 = cf.config['SQL_QRY_1']
        msg = "Documents generatd based on unique key"
        collection_flg = 1

        x = cm.clsColMgmt()
        df_ret = x.fetch_data(sql_qry_1, msg, collection_flg)

        l.logr('1.EmailFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('RealtimeEmail Data::')
        print(df_ret)
        print()

        # Checking execution status
        ret_val = int(df_ret.shape[0])

        if ret_val == 0:
            print("Cosmos DB Hans't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfully fetched!")
            print("*" * 157)

        # Calling the 2nd Collection
        print("RealtimeTwitterFeedback::")

        # Fetching First collection data to dataframe
        print("Fethcing Cosmos Collection Data!")

        # Query using parameters
        sql_qry_2 = cf.config['SQL_QRY_2']
        msg_2 = "Documents generated based on RealtimeTwitterFeedback feed!"
        collection_flg = 2

        val = 'crazyGo'
        param_det = [{"name": "@CrVal", "value": val}]
        add_param = 2

        x1 = cm.clsColMgmt()
        df_ret_2 = x1.fetch_data(sql_qry_2, msg_2, collection_flg, add_param, param_det)

        l.logr('2.TwitterFeedback_' + var + '.csv', debug_ind, df_ret, 'log')
        print('Realtime Twitter Data:: ')
        print(df_ret_2)
        print()

        # Checking execution status
        ret_val_2 = int(df_ret_2.shape[0])

        if ret_val_2 == 0:
            print("Cosmos DB hasn't returned any rows. Please check your queries!")
            print("*" * 157)
        else:
            print("Successfuly row feteched!")
            print("*" * 157)

        # Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
        df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

        df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

        print("Initial Combined Data (From Cosmos & Azure SQL DB) :: ")
        print(df_fin)

        l.logr('3.InitCombine_' + var + '.csv', debug_ind, df_fin, 'log')

        # Transforming the orderDate as per standard format
        df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

        # Dropping the old column & renaming the new column to old column
        df_fin.drop(columns=['orderDate'], inplace=True)
        df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

        print("*" * 157)
        print()
        print("Final Combined & Transformed result:: ")
        print(df_fin)

        l.logr('4.Final_Combine_' + var + '.csv', debug_ind, df_fin, 'log')
        print("*" * 157)

    except ValueError:
        print("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

The key lines from this script –

def getDate(row):
    try:
        d1 = row['orderDate']
        d1_str = str(d1)
        d1_dt_part, sec = d1_str.split('.')
        dt_part1 = d1_dt_part.replace('T', ' ')

        return dt_part1
    except Exception as e:
        x = str(e)
        print(x)
        dt_part1 = ''

        return dt_part1

This function converts NoSQL date data type more familiar format.

NoSQL Date:
NoSQL_Date
Transformed Date:
Transformed Date
# Accessing from Azure SQL DB
x1 = dbcon.clsDBLookup()
act_df = x1.azure_sqldb_read(cf.config['AZURE_SQL_1'])

print("Azure SQL DB::")
print(act_df)
print()

Above lines are calling the Azure SQL DB method to retrieve the RDBMS data into our dataframe.

# Merging NoSQL Data (Cosmos DB) with Relational DB (Azure SQL DB)
df_Fin_temp = p.merge(df_ret, act_df, on='subscriberId', how='inner')

df_fin = df_Fin_temp[['orderDate', 'orderNo', 'sender', 'state', 'country', 'customerType']]

In these above lines, we’re joining the data retrieved from two different kinds of the database to prepare our initial combined dataframe. Also, we’ve picked only the desired column, which will be useful for us.

# Transforming the orderDate as per standard format
df_fin['orderDateM'] = df_fin.apply(lambda row: getDate(row), axis=1)

# Dropping the old column & renaming the new column to old column
df_fin.drop(columns=['orderDate'], inplace=True)
df_fin.rename(columns={'orderDateM': 'orderDate'}, inplace=True)

In the above lines, we’re transforming our date field, as shown above in one of our previous images by calling the getDate method.

Let’s see the directory structure of our program –

Win_Vs_MAC

Let’s see how it looks when it runs –

Windows:

Win_Run_1
Win_Run_2

MAC:

MAC_Run_1
MAC_Run_2

So, finally, we’ve successfully blended the data & make more meaningful data projection.

Following python packages are required to run this application –

pip install azure

pip install azure-cosmos

pip install pandas

pip install requests

pip install pyodbc

This application tested on Python3.7.1 & Python3.7.2 as well. As per Microsoft, their official supported version is Python3.5.

I hope you’ll like this effort.

Wait for the next installment. Till then, Happy Avenging. 😀

[Note: All the sample data are available/prepared in the public domain for research & study.]

Password Validation Using Regular Expression In Teradata 14 & 15

Today, we’ll be checking one new area where we can implement regular expression to achieve the password validation without involving any kind of Macro, Stored-Proc.

 

Let’s consider the following conditions to be implemented –

 

1. Password should contain characters between 6 & 10.

2. One character should be digit.

3. One character should be in upper case letter.

4. There should be at least one special character.

 

Let’s check the Query & Output –

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
select seq_no,
       passwd,
       regexp_similar(passwd,'^(?=^([[:graph:]]{6,10})$)(?=.*([[:upper:]]{1,}))(?=.*([[:digit:]]{1,})).*$') as reg_test
from scott.login_det
order by 1;


SEQ_NO	PASSWD	 REG_TEST
-----   -------  --------------
1	hoti         0
2	hotimla	     0
3	hotImla	     0
4	hot@imla     0
5	hoT@imla     0
6	hoT@iml9a    1
7	hoT@iml9a66  0

 

Similarly, you can add condition of lower case character if you want to make it more complex.

 

Hope, this will give you another way – to implement the same logic. 🙂

Performance of Regular Expression in Teradata 14.0

Today I’ll explain about the performance impact of these Regular expressions in Teradata.

It is believed that these functions have newly introduced. Hence, it may possible that these function may take some time to settle or in other words we may expect to see some patches before they can be considered as stable & ready to use in TD.

Before, we can go through this – we must understood about these functions & where we should use them properly. It is quite obvious that we would like to use them in such places where using teradata’s old stable function cannot achieve using a single SQL or we are looking for some kind of Stored-Proc in order to implement this business logic. Hence, it would be unfair to simply compare a simple solution with this. Rather, we should consider those complex parsing logic & the total performance by those Stored-Proc or any relevant process with these functions. In those cases – Regular expression will be very handy – I believe.

Let’s consider one simple case –

Let’s consider the following string – “SANTA’S JOYFULL GIFT“.

I want to fetch the a part of the string till it encounters first space character i.e. it will provide the following output as per the business logic – “SANTA’S“.

I’ll test that with significant volume of data & would like to compare the explain plan between the normal process & regular expression.

Let’s check the explain plan for the SQL that uses conventional functions –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
EXPLAIN SELECT C_KEY,
        C_CD,
        S_ORG_NM,
        SUBSTR(S_ORG_NM,1,POSITION(' ' IN S_ORG_NM||' ')) AS DER_S_ORG_NM
FROM MASTER_CLAIM
WHERE C_CD = '555';

  1) First, we lock EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM for
     access.
  2) Next, we do an all-AMPs RETRIEVE step from EDW_CORE_DB.MASTER_CLAIM in view
     ETL_VIEWS.MASTER_CLAIM by way of an all-rows scan with a condition
     of ("EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM.C_CD = '555 '")
     into Spool 1 (group_amps), which is built locally on the AMPs.
     The input table will not be cached in memory, but it is eligible
     for synchronized scanning.  The size of Spool 1 is estimated with
     high confidence to be 38,212,793 rows (5,082,301,469 bytes).  The
     estimated time for this step is 40.02 seconds.
  3) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 40.02 seconds.

 

Now, let’s try the same with the Regular expression –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
EXPLAIN SELECT C_KEY,
        C_CD,
        S_ORG_NM,
        regexp_substr(S_ORG_NM,'[^ ]+') AS DER_S_ORG_NM
FROM MASTER_CLAIM
WHERE C_CD = '555';

  1) First, we lock EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM for
     access.
  2) Next, we do an all-AMPs RETRIEVE step from EDW_CORE_DB.MASTER_CLAIM in view
     ETL_VIEWS.MASTER_CLAIM by way of an all-rows scan with a condition
     of ("EDW_CORE_DB.MASTER_CLAIM in view ETL_VIEWS.MASTER_CLAIM.C_CD = '555 '")
     into Spool 1 (group_amps), which is built locally on the AMPs.
     The input table will not be cached in memory, but it is eligible
     for synchronized scanning.  The size of Spool 1 is estimated with
     high confidence to be 38,212,793 rows (105,696,585,438 bytes).
     The estimated time for this step is 40.02 seconds.
  3) Finally, we send out an END TRANSACTION step to all AMPs involved
     in processing the request.
  -> The contents of Spool 1 are sent back to the user as the result of
     statement 1.  The total estimated time is 40.02 seconds.

 

So, from the above two – we really can’t find much difference in plan except the number of bytes that transfers. But, in both the cases the estimated time shows 40.02 seconds only.

So, now we can check what will be the actual time it will take. Let’s see that also.

First, let us create one Virtual Table & try to record the total create time –

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE MULTISET VOLATILE TABLE VT1
AS
    (
         SELECT C_KEY,
				        C_CD,
				        S_ORG_NM,
				        SUBSTR(S_ORG_NM,1,POSITION(' ' IN S_ORG_NM||' ')) AS DER_S_ORG_NM
		 FROM MASTER_CLAIM
		 WHERE C_CD = '555'
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

 

And, the response is as follows –

1
--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:05.076

 

Let’s create another VT with the new approach –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
CREATE MULTISET VOLATILE TABLE VT2
AS
    (
         SELECT C_KEY,
				        C_CD,
				        S_ORG_NM,
				        regexp_substr(S_ORG_NM,'[^ ]+') AS DER_S_ORG_NM
		 FROM MASTER_CLAIM
		 WHERE C_CD = '555'
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

 

And, the response time –

1
--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:05.762

 

So, as you can see there is not much difference between the old process & new process.

And, the total number of records we have test this –

 

1
2
3
4
SELECT COUNT(*)
FROM VT1;

40,781,904

 

So, from the above you can see that we’ve tested this on significant number of rows, which is very common in any TD system.

Let’s test whether both the SQLs actually returning same value. To do that – we’ll create one more VT are as follows –

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
CREATE MULTISET VOLATILE TABLE VT3
AS
    (
         SELECT a.C_KEY,
				        a.C_CD,
				        a.S_ORG_NM,
				        a.DER_S_ORG_NM AS OLD_PRCHSR_ORG_NM,
				        b.DER_S_ORG_NM AS NEW_PRCHSR_ORG_NM,
				        CHAR_LENGTH(a.DER_S_ORG_NM) AS OLD_PRCHSR_ORG_NM_LEN,
				        CHAR_LENGTH(b.DER_S_ORG_NM) AS NEW_PRCHSR_ORG_NM_LEN
		 FROM VT1 a,
		             VT2 b
		 WHERE a.C_KEY = b.C_KEY
    )
WITH DATA
ON COMMIT
PRESERVE ROWS;

--CREATE TABLE completed. 0 rows processed. Elapsed Time =  00:00:06.864

 

Now, lets test the output –

 

1
2
3
4
5
SELECT *
FROM VT3
WHERE OLD_PRCHSR_ORG_NM <> NEW_PRCHSR_ORG_NM;

--SELECT completed. 0 rows returned. Elapsed Time =  00:00:01.763

 

So, as you can see that from the above simulation – we can establish that the performance between the conventional SQL & SQL using Regular expression are negligible.

But, again I must clearly say – Regular expression will be ideal where we need multiple SQLs or PL/SQL to implement. Or, the place where you need to implement one complex parsing that is difficult to implement in a SQL.

Hope this will give you some clarity. 😀

Regular Expression on Teradata 14.0

I’ve been working for more than 8 years in Oracle 10g, 11g & worked significant queries on Regular expressions in various scenario using SQL. It is real handy if you know how to use it & can reduce lots of pain with single SQL. And, the performance will be better compared to the total effort to achieve the same functionalists by using multiple SQL queries or PL/SQL Procedures.

Last couple of years, I’m working on Teradata. And, on some occasion – I was expecting features like these, where I can easily manipulate data with regular expression. I’m pretty excited when I heard that Teradata also introduced Regular Expression from Version 14.0.


As a result, I tried all those features that I think can be handy & useful for various scenarios & followings are the successful queries that I get. There are two occasion, where Teradata partially able to manipulate those strings. I’ve checked the latest Teradata Manual. However, unable to find those solution. So, I’m expecting other forum members can contribute here in order to make this thread useful for every one of us. And, I’ll post here as soon as I get some answers on these partial conversions.

For better understanding, I’ve provided the actual column value & after transformation value of that column in the output. That will help us to grasp it easily – I guess. 🙂


Case 1,

1
2
3
4
5
SELECT regexp_replace('SatyakiDe','([[:lower:]]{1,})([[:upper:]]{1,})','\1 \2') AS COL_VAL;

COLA COL_VAL
---------------- ----------------------------------------
SatyakiDe Satyaki De


Case 2,

1
2
3
4
5
select regexp_replace('919047242526','^([[:digit:]]{2})([[:digit:]]{10})','+\1 \2') COL_VAL;

COLA COL_VAL
------------ ---------------
919047255555 +91 9047255555



Case 3,

1
2
3
4
5
select regexp_replace('+++C','^([[:punct:]]{2})([[:punct:]]{1})(.*)$','\1\3') COL_VAL;

COLA COL_VAL
---- -----
+++C ++C



Case 4,

1
2
3
4
5
select initcap(regexp_replace(regexp_substr(' satyaki.de@mail.com','[^@]+'),'(.*)(\.)(.*)','\1 \3')) COL_VAL;

COLA COL_VAL
-------------------------------- --------------------------------------------------
satyaki.de@mail.com Satyaki De



Case 5,

1
2
3
4
5
select regexp_replace('100011001','([[:digit:]]{3})([[:digit:]]{2})([[:digit:]]{4})','XXX-XX-\3') as COL_VAL;

COLA COL_VAL
---------------- --------------------
100011001 XXX-XX-1001



Case 6,

1
2
3
4
5
select regexp_replace('123456789','([[:digit:]]{3})([[:digit:]]{3})([[:digit:]]{3})','\3.\2.\1') as COL_VAL;

COLA COL_VAL
--------- ---------------
123456789 789.456.123



Case 7,

1
2
3
4
5
SELECT regexp_replace('satyaki9de0loves3to8work2on2sql0and2bi6tools1','[^0-9]+','',1,0,'i') AS DER_VAL;

COLA DER_VAL
--------------------------------------------- ----------
satyaki1de0loves3to8work2on2sql0and2bi4tools1 1038220241




As you can see, all the characters have filtered out from the string & only numbers are kept here. These sorts of queries are very useful in lots of different business scenarios as well.

So, any extra space may not produce desired result. And, needs to pay attention into these small details. 

And, I’ve tested all these queries in the following two versions –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.00.02
2 RELEASE 14.10.00.02
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard



Hope, this will give you much more clarity. 🙂

One more thing, I would like to clarify here – my intention is to describe more features about these regexp_(similar/substr/instr/replace) functions.

I’ve received one question whether these regexp functions available in TD 13 or not in Teradata forum while posting the same article over there.

And, here is my answer to that question –  

Regarding version 13,

Let us check whether they have these regexp functions or not –

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.00.00.15
2 RELEASE 13.00.00.15
3 LANGUAGE SUPPORT MODE Standard


select * from dbcinfo;

InfoKey InfoData
-------- ------------------------
1 VERSION 13.10.07.12
2 RELEASE 13.10.07.12
3 LANGUAGE SUPPORT MODE Standard


1
2
3
4
5
6
7
8
9
select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;

select regexp_replace('SatyakiDe','^(.*)([[:upper:]]{1,})(.*) $','\1 \2\3') AS COL_VAL;
$
*** Failure 3706 Syntax error: expected something between '(' and the string 'S' keyword.
Statement# 1, Info =35
*** Total elapsed time was 1 second.



Hope this will give adequate clarity to the answer of that above question.

Now, Lets see some other functionality.

REGEXP_SIMILAR has similar functionality like REGEXP_LIKE in Oracle.

Let’s see couple of such cases –

Lets prepare the table with some dummy data –


 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
SELECT * FROM dbc.dbcinfo;

InfoKey InfoData
-------- -----------------------
1 VERSION 14.10.01.05
2 RELEASE 14.10.01.04
3 LANGUAGE SUPPORT MODE Standard


CREATE MULTISET VOLATILE TABLE TEST_T1
(
COL1 VARCHAR(10)
)
ON COMMIT
PRESERVE ROWS;

INSERT INTO TEST_T1 VALUES('456')
;INSERT INTO TEST_T1 VALUES('123x')
;INSERT INTO TEST_T1 VALUES('x123')
;INSERT INTO TEST_T1 VALUES('y')
;INSERT INTO TEST_T1 VALUES('+789')
;INSERT INTO TEST_T1 VALUES('-789')
;INSERT INTO TEST_T1 VALUES('159-')
;INSERT INTO TEST_T1 VALUES('-1-');


Lets check the data now –

 1
2
3
4
5
6
7
8
9
10
11
12
SELECT *
FROM TEST_T1;

COL1
1 123x
2 456
3 x123
4 +789
5 -789
6 y
7 159-
8 -1-



Let’s look into the various scenarios now –


Case 1 (Returns Mixed Numbers, Signed Numbers & Non Numbers),

 1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=0;

COL1
-----
1 123x
2 x123
3 +789
4 -789
5 y
6 159-
7 -1-




Case 2 (Returns Only Unsigned Positive Numbers),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[0-9]+$','c')=1;

COL1
-----
456



Case 3 (Returns All Numbers including Positive, Negative & unsigned),

 1
2
3
4
5
6
7
8
9
10
11
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'^[+-]?[0-9]+[+-]?$','c')=1;

COL1
-----
456
+789
-789
159-
-1-



Case 4 (Returns Only Non Numbers i.e. Characters),

1
2
3
4
5
6
7
SELECT *
FROM TEST_T1
WHERE REGEXP_SIMILAR(COL1,'[^0-9]+','c')=1;

COL1
----
y



Hope this will give you some additional idea. 🙂

My objective is to provide basic information to my friends. So, that they can write better SQL in TD while migrating from other popular databases or new developer in TD can get a flavor of this powerful feature & exploit them in all the positive aspect & apply them properly. 😀

Really appreciate your time to read this post.

Regards.

Satyaki De.