Building Python-based best-route apps for Indian Railways

Hi Guys!

Today, I’ll present a way to get the best route from Indian Railways train between two specific sources & destination using third-party API.

This approach is particularly beneficial if you want to integrate this logic in Azure Function or Lambda Function or any serverless functions.

Before we dig into the details. Let us explore what kind of cloud-based architecture we can implement this.

Architecture

Fig: 1 (Cloud Architecture)

In this case, I’ve considered Azure as the implementation platform.

Let’s discuss how the events will take place. At first, a user searches for the best routes between two fixed stations. The user has to provide the source & destination stations. The request will go through the Azure Firewall after validating the initial authentication. As part of the API service, it will check for similar queries & if it is there, then it will fetch it from the cache & send it back to the user through their mobile application. However, for the first time, it will retrieve the information from the DB & keep a copy in the cache. This part also managed through a load balancer for high-level availability. However, periodically system will push the data from the cache to the DB with the updated information.

Let’s see the program directory structure –

ProgramDir

Let’s discuss our code –

1. clsConfig.py (This script contains all the parameters for the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 12-Oct-2019              ####
####                                      ####
#### Objective: This script is a config   ####
#### file, contains all the keys for      ####
#### azure cosmos db. Application will    ####
#### process these information & perform  ####
#### various CRUD operation on Cosmos DB. ####
##############################################

import os
import platform as pl

class clsConfig(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    config = {
        'APP_ID': 1,
        'URL': "https://trains.p.rapidapi.com/",
        'RAPID_API_HOST': "trains.p.rapidapi.com",
        'RAPID_API_KEY': "hrfjjdfjfjfjfjxxxxxjffjjfjfjfjfjfjfjf",
        'RAPID_API_TYPE': "application/json",
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'REPORT_PATH': Curr_Path + sep + 'report',
        'APP_DESC_1': 'Indian Railway Train Schedule Search',
        'DEBUG_IND': 'N',
        'INIT_PATH': Curr_Path,
        'COL_LIST': ['name','train_num','train_from','train_to','classes','departTime','arriveTime','Mon','Tue','Wed','Thu','Fri','Sat','Sun']
    }

As of now, I’ve replaced the API Key with the dummy value.

2. clsIndianRailway.py (This script will invoke the main Indian Railway API & try to get the response between two railway stations. Hence, the name comes into the picture.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main scripts to invoke    ####
#### Indian Railway API.                  ####
##############################################

import requests
import logging
import json
from clsConfig import clsConfig as cf

class clsIndianRailway:
    def __init__(self):
        self.url = cf.config['URL']
        self.rapidapi_host = cf.config['RAPID_API_HOST']
        self.rapidapi_key = cf.config['RAPID_API_KEY']
        self.type = cf.config['RAPID_API_TYPE']

    def searchQry(self, rawQry):
        try:
            url = self.url
            rapidapi_host = self.rapidapi_host
            rapidapi_key = self.rapidapi_key
            type = self.type

            Ipayload = "{\"search\":\"" + rawQry + "\"}"

            jpayload = json.dumps(Ipayload)
            payload = json.loads(jpayload)

            print('Input JSON: ', str(payload))

            headers = {
                'x-rapidapi-host': rapidapi_host,
                'x-rapidapi-key': rapidapi_key,
                'content-type': type,
                'accept': type
                }

            response = requests.request("POST", url, data=payload, headers=headers)

            ResJson  = response.text

            jdata = json.dumps(ResJson)
            ResJson = json.loads(jdata)

            return ResJson

        except Exception as e:
            ResJson = ''
            x = str(e)
            print(x)

            logging.info(x)
            ResJson = {'errorDetails': x}

            return ResJson

Let’s explain the critical snippet from the code.

url = self.url
rapidapi_host = self.rapidapi_host
rapidapi_key = self.rapidapi_key
type = self.type

Ipayload = "{\"search\":\"" + rawQry + "\"}"

jpayload = json.dumps(Ipayload)
payload = json.loads(jpayload)

The first four lines are to receive the parameter values. Our application needs to frame the search query, which is done in the IPayload variable. After that, our app will convert it into a json object type.

headers = {
    'x-rapidapi-host': rapidapi_host,
    'x-rapidapi-key': rapidapi_key,
    'content-type': type,
    'accept': type
    }

response = requests.request("POST", url, data=payload, headers=headers)

Now, the application will prepare the headers & send the request & received the response. Finally, that response will be sent by this script to the main callee application after extracting part of the response & converting that back to JSON are as follows –

response = requests.request("POST", url, data=payload, headers=headers)

ResJson  = response.text

jdata = json.dumps(ResJson)
ResJson = json.loads(jdata)

return ResJson

3. callIndianRailwayAPI.py (This is the main script which invokes the main Indian Railway API & tries to get the response between two railway stations. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
##############################################
#### Written By: SATYAKI DE               ####
#### Written On: 20-Dec-2019              ####
#### Modified On 20-Dec-2019              ####
####                                      ####
#### Objective: Main calling scripts.     ####
##############################################

from clsConfig import clsConfig as cf
import pandas as p
import clsL as cl
import logging
import datetime
import json
import clsIndianRailway as ct
import re
import numpy as np

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Lookup functions from
# Azure cloud SQL DB

var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

def getArriveTimeOnly(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        str_arr_time, remain = lkp_arriveTime.split('+')

        return str_arr_time

    except Exception as e:
        x = str(e)
        str_arr_time = ''

        return str_arr_time

def getArriveDateDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTime = str(row['arriveTime'])

        first_half, str_date_diff_init = lkp_arriveTime.split('+')

        # Replacing the text part from it & only capturing the integer part
        str_date_diff = int(re.sub(r"[a-z]","",str_date_diff_init, flags=re.I))

        return str_date_diff

    except Exception as e:
        x = str(e)
        str_date_diff = 0

        return str_date_diff

def getArriveTimeDiff(row):
    try:
        # Using regular expression to fetch time part only

        lkp_arriveTimeM = str(row['arriveTimeM'])

        str_time_diff_init = int(re.sub(r'[^\w\s]', '', lkp_arriveTimeM))

        # Replacing the text part from it & only capturing the integer part
        str_time_diff = (2400 - str_time_diff_init)

        return str_time_diff

    except Exception as e:
        x = str(e)
        str_time_diff = 0

        return str_time_diff

def main():
    try:
        dfSrc = p.DataFrame()
        df_ret = p.DataFrame()
        ret_2 = ''
        debug_ind = 'Y'
        col_list = cf.config['COL_LIST']

        general_log_path = str(cf.config['LOG_PATH'])

        # Enabling Logging Info
        logging.basicConfig(filename=general_log_path + 'consolidatedIR.log', level=logging.INFO)

        # Initiating Log Class
        l = cl.clsL()

        # Moving previous day log files to archive directory
        arch_dir = cf.config['ARCH_DIR']
        log_dir = cf.config['LOG_PATH']

        tmpR0 = "*" * 157

        logging.info(tmpR0)
        tmpR9 = 'Start Time: ' + str(var)
        logging.info(tmpR9)
        logging.info(tmpR0)

        print("Archive Directory:: ", arch_dir)
        print("Log Directory::", log_dir)
        tmpR1 = 'Log Directory::' + log_dir
        logging.info(tmpR1)

        # Query using parameters
        rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

        x1 = ct.clsIndianRailway()
        ret_2 = x1.searchQry(rawQry)

        # Capturing the JSON Payload
        res = json.loads(ret_2)

        # Converting dictionary to Pandas Dataframe
        # df_ret = p.read_json(ret_2, orient='records')

        df_ret = p.io.json.json_normalize(res)
        df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

        # Resetting the column orders as per JSON
        # df_ret = df_ret[list(res[0].keys())]
        column_order = col_list
        df_mod_ret = df_ret.reindex(column_order, axis=1)

        # Sorting the source data for better viewing
        df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

        l.logr('1.IndianRailway_' + var + '.csv', debug_ind, df_mod_resp, 'log')

        # Fetching Data for Delhi To Howrah
        df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

        l.logr('2.IndianRailway_Delhi2Howrah_' + var + '.csv', debug_ind, df_del_how, 'log')

        # Splitting Arrive time into two separate fields for better calculation
        df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
        df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
        df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

        l.logr('3.IndianRailway_Del2How_Mod_' + var + '.csv', debug_ind, df_del_how, 'log')

        # To fetch the best route which saves time
        lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
        min_lstTimeDayDiff = int(min(lstTimeDayDiff))

        df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

        l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

        # Now application will check the maximum arrivetimediff, this will bring the record
        # which arrives early at Howrah station
        lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
        max_lstTimeDiff = int(max(lstTimeDiff))

        df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

        # Dropping unwanted columns
        df_best_route.drop(columns=['arriveTimeM'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDayDiff'], inplace=True)
        df_best_route.drop(columns=['arriveTimeDiff'], inplace=True)

        l.logr('5.IndianRailway_Del2How_BestRoute_' + var + '.csv', debug_ind, df_best_route, 'log')

        print("-" * 60)

        print('Realtime Indian Railway Data:: ')
        logging.info('Realtime Indian Railway Data:: ')
        print(df_mod_resp)
        print()
        print('Best Route from Delhi -> Howrah:: ')
        print(df_best_route)
        print()

        # Checking execution status
        ret_val_2 = df_best_route.shape[0]

        if ret_val_2 == 0:
            print("Indian Railway hasn't returned any rows. Please check your queries!")
            logging.info("Indian Railway hasn't returned any rows. Please check your queries!")
            print("*" * 157)
            logging.info(tmpR0)
        else:
            print("Successfuly row feteched!")
            logging.info("Successfuly row feteched!")
            print("*" * 157)
            logging.info(tmpR0)

        print('Finding Story points..')
        print("*" * 157)
        logging.info('Finding Story points..')
        logging.info(tmpR0)


        tmpR10 = 'End Time: ' + str(var)
        logging.info(tmpR10)
        logging.info(tmpR0)

    except ValueError:
        print("No relevant data to proceed!")
        logging.info("No relevant data to proceed!")

    except Exception as e:
        print("Top level Error: args:{0}, message{1}".format(e.args, e.message))

if __name__ == "__main__":
    main()

Key snippet to explore –

# Query using parameters
rawQry = str(input('Please enter the name of the train service that you want to find out (Either by Name or by Number): '))

In this case, we make it interactive mode. However, in the actual scenario, you would receive these values from your mobile application.

x1 = ct.clsIndianRailway()
ret_2 = x1.searchQry(rawQry)

# Capturing the JSON Payload
res = json.loads(ret_2)

The above four lines initially invoke the API & receive the JSON response.

# Converting dictionary to Pandas Dataframe
df_ret = p.io.json.json_normalize(res)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])

# Resetting the column orders as per JSON
column_order = col_list
df_mod_ret = df_ret.reindex(column_order, axis=1)

# Sorting the source data for better viewing
df_mod_resp = df_mod_ret.sort_values(by=['train_from','train_to','train_num'])

In these last five lines, our application will convert the JSON & serialize it into pandas dataframe, which is sorted after that.

The result will look like this –

SerializeJson2PandasDF

This is exceptionally critical, as this will allow you to achieve your target. Without flattening the data, you won’t get to your goal.

# Fetching Data for Delhi To Howrah
df_del_how = df_mod_resp[(df_mod_resp['train_from'] == 'NDLS') & (df_mod_resp['train_to'] == 'HWH')]

As the line suggested, our application will pick-up only those records between New Delhi & Howrah. Thus, we’ve used our filter to eliminate additional records. And, the data will look like this –

SilteredRecords

Now, we need to identify the minimum time taken by anyone of the two records. For that, we’ll be doing some calculations to fetch the minimum time taken by the application.

# Splitting Arrive time into two separate fields for better calculation
df_del_how['arriveTimeM'] = df_del_how.apply(lambda row: getArriveTimeOnly(row), axis=1)
df_del_how['arriveTimeDayDiff'] = df_del_how.apply(lambda row: getArriveDateDiff(row), axis=1)
df_del_how['arriveTimeDiff'] = df_del_how.apply(lambda row: getArriveTimeDiff(row), axis=1)

To do that, we’ll be generating a couple of derived columns (shown above), which we’ll be using the fetch the shortest duration. And, the data should look like this –

CalculatedFields

These are the two fields, which we’re using for our calculation. First, we’re splitting arriveTime into two separate columns i.e. arriveTimeM & arriveTimeDayDiff. However, arriveTimeDiff is a calculated field.

So, our logic to find the best routes –

  • arriveTimeDayDiff = Take the minimum of the records. If you have multiple candidates, then we’ll pick all of them. In this case, we’ll get two records.
  • ArrivalDiff = (24:00 – <Train’s Arrival Time>), then take the maximum of the value

Note that, in this case, we haven’t considered the departure time. You can add that logic to improvise & correct your prediction.

The above steps can be seen in the following snippet –

# To fetch the best route which saves time
lstTimeDayDiff = df_del_how['arriveTimeDayDiff'].values.tolist()
min_lstTimeDayDiff = int(min(lstTimeDayDiff))

df_min_timedaydiff = df_del_how[(df_del_how['arriveTimeDayDiff'] == min_lstTimeDayDiff)]

l.logr('4.IndianRailway_Del2How_TimeCalc_' + var + '.csv', debug_ind, df_min_timedaydiff, 'log')

# Now application will check the maximum arrivetimediff, this will bring the record
# which arrives early at Howrah station
lstTimeDiff = df_min_timedaydiff['arriveTimeDiff'].values.tolist()
max_lstTimeDiff = int(max(lstTimeDiff))

df_best_route = df_min_timedaydiff[(df_min_timedaydiff['arriveTimeDiff'] == max_lstTimeDiff)]

Let’s see how it runs –

Output

As you can see that NDLS (New Delhi), we’ve three records marked in the GREEN square box. However, as destination HWH (Howrah), we’ve only two records marked in the RED square box. However, as part of our calculation, we’ll pick the record marked with the BLUE square box.

Let’s see how the log directory generates all the files –

Log_Dir

Let’s see the final output in our csv file –

BestRoute

So, finally, we’ve achieved it. 😀

Let me know – how do you like this post. Please share your suggestion & comments.

I’ll be back with another installment from the Python verse.

Till then – Happy Avenging!

Note: All the data posted here are representational data & available over the internet & for educational purpose only.

Building Azure Databricks Cluster installing desired packages & with a demo run (Time stone from Python Verse)

Today, I’ll be showing how to prepare a cluster in Azure Databricks from command prompt & will demonstrate any sample csv file process using Pyspark. This can be useful, especially when you want to customize your environment & need to install specific packages inside the clusters with more options.

This is not like any of my earlier posts, where my primary attention is on the Python side. At the end of this post, I’ll showcase one use of Pyspark script & how we can execute them inside Azure Data bricks.

Let’s roll the dice!

Step -1:

Type Azure Databricks in your search folder inside the Azure portal.

0. Azure Search

As shown in the red box, you have to click these options. And, it will take the application to new data bricks sign-in page.

Step -2:

Next step would be clicking the “Add” button. For the first time, the application will ask you to create a storage account associated with this brick.

1. Create Storage

After creation, the screen should look like this –

2.5. Azure-Data-Bricks Options

Now, click the Azure command-line & chose bash as your work environment –

2. After Creation

For security reason, I’ve masked the details.

After successful creation, this page should look like this –

3. Azure Databricks

Once, you click the launch workspace, it will take you to this next page –

4. Detailed Bricks

As you can see that, there are no notebook or python scripts there under Recents tab.

Step -3:

Let’s verify it from the command line shell environment.

5. Python-Env

As you can see, by default python version in bricks is 3.5.2.

Step -4:

Now, we’ll prepare one environment by creating a local directory under the cloud.

The directory that we’ll be creating is – “rndBricks.”

6. Creating Directory

Step -5:

Let’s create the virtual environment here –

Using “virtualenv” function, we’ll be creating the virtual environment & it should look like this –

7. Creating Python-VM

As you can see, that – this will create the first python virtual environment along with the pip & wheel, which is essential for your python environment.

After creating the VM, you need to update Azure CLI, which is shown in the next screenshot given below –

8. Installing Databricks CLI in Python-VM

Before you create the cluster, you need to first generate the token, which will be used for the cluster –

9.1. Generating Token

As shown in the above screen, the “red” marked area is our primary interest. The “green” box, which represents the account image that you need to click & then you have to click “User Settings” marked in blue. Once you click that, you can see the “purple” area, where you need to click the Generate new token button in case if you are doing it for the first time.

Now, we’ll be using this newly generated token to configure data bricks are as follows –

9.2. Configuring with Token

Make sure, you need to mention the correct zone, i.e. westus2/westus or any region as per your geography & convenience.

Once, that is done. You can check the cluster list by the following command (In case, if you already created any clusters in your subscription) –

10. Checking Clusters List

Since we’re building it from scratch. There is no cluster information showing here.

Step -6:

Let’s create the clusters –

11. Creating-Clusters-From-Command

Please find the command that you will be using are as follows –

databricks clusters create –json ‘{ “autoscale”: {“min_workers”: 2, “max_workers”: 8}, “cluster_name”: “pyRnd”, “spark_version”: “5.3.x-scala2.11”, “spark_conf”: {}, “node_type_id”: “Standard_DS3_v2”, “driver_node_type_id”: “Standard_DS3_v2”, “ssh_public_keys”: [], “custom_tags”: {}, “spark_env_vars”: {“PYSPARK_PYTHON”: “/databricks/python3/bin/python3”}, “autotermination_minutes”: 20, “enable_elastic_disk”: true, “cluster_source”: “UI”, “init_scripts”: [] }’

As you can see, you need to pass the information in JSON format. For your better understanding, please find the JSON in a proper format –

11.5. JSON

And, the raw version –

{
  "autoscale": {
    "min_workers": 2,
    "max_workers": 8
  },
  "cluster_name": "pyRnd",
  "spark_version": "5.3.x-scala2.11",
  "spark_conf": {},
  "node_type_id": "Standard_DS3_v2",
  "driver_node_type_id": "Standard_DS3_v2",
  "ssh_public_keys": [],
  "custom_tags": {},
  "spark_env_vars": {
    "PYSPARK_PYTHON": "/databricks/python3/bin/python3"
  },
  "autotermination_minutes": 20,
  "enable_elastic_disk": true,
  "cluster_source": "UI",
  "init_scripts": []
}

Initially, the cluster status will show from the GUI are as follows –

12. Cluster-Status-In-Progress

After a few minutes, this will show the running state –

13. Cluster-Running Status

Let’s check the detailed configuration once the cluster created –

14. Initial Cluster Details

Step -7:

We need to check the library section. This is important as we might need to install many dependant python package to run your application on Azure data bricks. And, the initial Libraries will look like this –

15. Libraries

You can install libraries into an existing cluster either through GUI or through shell command prompt as well. Let’s explore the GUI option.

GUI Option:

First, click the Libraries tab under your newly created clusters, as shown in the above picture. Then you need to click “Install New” button. This will pop-up the following windows –

16. Installing Libraries

As you can see, you have many options along with the possibilities for your python (marked in red) application as well.

Case 1 (Installing PyPi packages):

19. Installing through GUI

Note: You can either mention the specific version or just simply name the package name.

Case 2 (Installing Wheel packages):

16.5. Installing Wheel Libraries

As you can see, from the upload options, you can upload your local libraries & then click the install button to install the same.

UI Option:

Here is another way, you can install your python libraries using the command line as shown in the below screenshots –

17. Running & Installing Libraries - Alternate Options

Few things to notice. The first command shows the current running cluster list. Second, command updating your pip packages. And, the third command, install your desired pypi packages.

Please find the raw commands –

databricks clusters list

pip install -U pip

databricks libraries install –cluster-id “XXXX-XXXXX-leech896” –pypi-package “pandas” –pypi-repo “https://pypi.org/project/pandas/&#8221;

After installing, the GUI page under the libraries section will look like this –

18. Installed Libraries

Note that, for any failed case, you can check the log in this way –

20. Installation-In-progress

If you click on the marked red area, it will pop-up the detailed error details, which is as follows –

19.5. Error Details

So, we’re done with our initial set-up.

Let’s upload one sample file into this environment & try to parse the data.

Step -8:

You can upload your sample file as follows –

23.1. First Step

First, click the “data” & then click the “add data” marked in the red box.

You can import this entire csv data as tables as shown in the next screenshot –

23.2. Uploading Data Files

Also, you can create a local directory here based on your requirements are explained as –

24. Creating Local Directory For Process

Step -9:

Let’s run the code.

Please find the following snippet in PySpark for our test –

1. DBFromFile.py (This script will call the Bricks script & process the data to create an SQL like a table for our task.)

###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 10-Feb-2019       ########
####                               ########
#### Objective: Pyspark File to    ########
#### parse the uploaded csv file.  ########
###########################################

# File location and type
file_location = "/FileStore/tables/src_file/customer_addr_20180112.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

# Create a view or table

temp_table_name = "customer_addr_20180112_csv"

df.createOrReplaceTempView(temp_table_name)

%sql

/* Query the created temp table in a SQL cell */

select * from `customer_addr_20180112_csv`

From the above sample snippet, one can see that the application is trying to parse the source data by providing all the parsing details & then use that csv as a table in SQL.

Let’s check step by step execution.

25. Working With Uploaded File

So, until this step, you can see that the application has successfully parsed the csv data.

And, finally, you can view the data –

25.1. Second Option

As the highlighted blue box shows that the application is using this csv file as a table. So, you have many options to analyze the information flexibly if you are familiar with SQL.

After your job run, make sure you terminate your cluster. Otherwise, you’ll receive a large & expensive usage bill, which you might not want!

So, finally, we’ve done it.

Let me know what do you think.

Till then, Happy Avenging! 😀

Note: All the data posted here are representational data & available over the internet & for educational purpose only.