Improvement of Pandas data processing performance using Multi-threading with the Queue (Another crossover of Space Stone, Reality Stone & Power Stone)

Today, we’ll discuss how to improve your panda’s data processing power using Multi-threading. Note that, we are not going to use any third party python package. Also, we’ll be using a couple of python scripts, which we’ve already discussed in our previous posts. Hence, this time, I won’t post them here.

Please refer the following scripts –

a. callClient.py
b. callRunServer.py
c. clsConfigServer.py
d. clsEnDec.py
e. clsFlask.py
f. clsL.py
g. clsParam.py
h. clsSerial.py
i. clsWeb.py

Please find the above scripts described here with details.

So, today, we’ll be looking into how the multi-threading really helps the application to gain some performance over others.

Let’s go through our existing old sample files –

Sample Data

And, we’ve four columns that are applicable for encryption. This file contains 10K records. That means the application will make 40K calls to the server for a different kind of encryption for each column.

Now, if you are going with the serial approach, which I’ve already discussed here, will take significant time for data processing. However, if we could club a few rows as one block & in this way we can create multiple blocks out of our data csv like this –

Data_Blocks

As you can see that blocks are marked with a different color. So, now if you send each block of data in parallel & send the data for encryption. Ideally, you will be able to process data much faster than the usual serial process. And, this what we would be looking for with the help of python’s multi-threading & queue. Without the queue, this program won’t be possible as the queue maintains the data & process integrity.

One more thing we would like to explain here. Whenever this application is sending the block of data. It will be posting that packed into a (key, value) dictionary randomly. Key will be the thread name. The reason, we’re not expecting data after process might arrive in some random order wrapped with the dictionary as well. Once the application received all the dictionary with dataframe with encrypted/decrypted data, the data will be rearranged based on the key & then joined back with the rest of the data.

Let’s see one sample way of sending & receiving random thread –

Data Packing

The left-hand side, the application is splitting the recordset into small chunks of a group. Once, those group created, using python multi-threading the application is now pushing them into the queue for the producer to produce the encrypted/decrypted value. Similar way, after processing the application will push the final product into the queue for consuming the final output.

This is the pictorial representation of dictionary ordering based on the key-value & then the application will extract the entire data to form the target csv file.

Final_Data_Sort

Let’s explore the script –

1. clsParallel.py (This script will consume the split csv files & send the data blocks in the form of the dictionary using multi-threading to the API for encryption in parallel. Hence, the name comes into the picture.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
import pandas as p
import clsWeb as cw
import datetime
from clsParam import clsParam as cf
import threading
from queue import Queue
import gc
import signal
import time
import os

# Declaring Global Variable
q = Queue()
m = Queue()
tLock = threading.Lock()
threads = []

fin_dict = {}
fin_dict_1 = {}
stopping = threading.Event()

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

class clsParallel(object):
    def __init__(self):
        self.path = cf.config['PATH']
        self.EncryptMode = str(cf.config['ENCRYPT_MODE'])
        self.DecryptMode = str(cf.config['DECRYPT_MODE'])
        self.num_worker_threads = int(cf.config['NUM_OF_THREAD'])
        

    # Lookup Methods for Encryption
    def encrypt_acctNbr(self, row):
        # Declaring Local Variable
        en_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctNbr = x.getResponse(EncryptMode)
        else:
            en_AcctNbr = ''

        return en_AcctNbr

    def encrypt_Name(self, row):
        # Declaring Local Variable
        en_AcctName = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_AcctName = x.getResponse(EncryptMode)
        else:
            en_AcctName = ''

        return en_AcctName

    def encrypt_Phone(self, row):
        # Declaring Local Variable
        en_Phone = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Phone = x.getResponse(EncryptMode)
        else:
            en_Phone = ''

        return en_Phone

    def encrypt_Email(self, row):
        # Declaring Local Variable
        en_Email = ''

        # Capturing essential values
        EncryptMode = self.EncryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            en_Email = x.getResponse(EncryptMode)
        else:
            en_Email = ''

        return en_Email

    # Lookup Methods for Decryption
    def decrypt_acctNbr(self, row):
        # Declaring Local Variable
        de_AcctNbr = ''
        json_source_str = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctNbr = row['Acct_Nbr']
        str_acct_nbr = str(lkp_acctNbr)
        fil_acct_nbr = str_acct_nbr.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_nbr + '","dataTemplate":"subGrAcct_Nbr"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_nbr)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctNbr = x.getResponse(EncryptMode)
        else:
            de_AcctNbr = ''

        return de_AcctNbr

    def decrypt_Name(self, row):
        # Declaring Local Variable
        de_AcctName = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_acctName = row['Name']
        str_acct_name = str(lkp_acctName)
        fil_acct_name = str_acct_name.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_acct_name + '","dataTemplate":"subGrName"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_acct_name)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_AcctName = x.getResponse(EncryptMode)
        else:
            de_AcctName = ''

        return de_AcctName

    def decrypt_Phone(self, row):
        # Declaring Local Variable
        de_Phone = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_phone = row['Phone']
        str_phone = str(lkp_phone)
        fil_phone = str_phone.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_phone + '","dataTemplate":"subGrPhone"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_phone)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Phone = x.getResponse(EncryptMode)
        else:
            de_Phone = ''

        return de_Phone

    def decrypt_Email(self, row):
        # Declaring Local Variable
        de_Email = ''

        # Capturing essential values
        EncryptMode = self.DecryptMode
        lkp_email = row['Email']
        str_email = str(lkp_email)
        fil_email = str_email.strip()

        # Forming JSON String for this field
        json_source_str = '{"dataGroup":"GrDet","data":"' + fil_email + '","dataTemplate":"subGrEmail"}'

        # Identifying Length of the field
        len_acct_nbr = len(fil_email)

        # This will trigger the service if it has valid data
        if len_acct_nbr > 0:
            x = cw.clsWeb(json_source_str)
            de_Email = x.getResponse(EncryptMode)
        else:
            de_Email = ''

        return de_Email

    def getEncrypt(self, df_dict):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_dict.items():
                Thread_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])
            # print('Part number of records to process:: ', count_row)

            if count_row > 0:

                # Deriving rows
                df_input['Encrypt_Acct_Nbr'] = df_input.apply(lambda row: self.encrypt_acctNbr(row), axis=1)
                df_input['Encrypt_Name'] = df_input.apply(lambda row: self.encrypt_Name(row), axis=1)
                df_input['Encrypt_Phone'] = df_input.apply(lambda row: self.encrypt_Phone(row), axis=1)
                df_input['Encrypt_Email'] = df_input.apply(lambda row: self.encrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Encrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Encrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Encrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Encrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email', 'Serial_No']
                df_fin = df_input.reindex(column_order, axis=1)

                fin_dict[Thread_Name] = df_fin

            return 0
        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr':str(e), 'Name':'', 'Acct_Addr_1':'', 'Acct_Addr_2':'', 'Phone':'', 'Email':'', 'Serial_No':''})
            fin_dict[Thread_Name] = df_error

            return 1

    def getEncryptWQ(self):
        item_dict = {}
        item = ''

        while True:
            try:
                #item_dict = q.get()
                item_dict = q.get_nowait()

                for k, v in item_dict.items():
                    # Assigning Target File Basic Name
                    item = str(k)

                if ((item == 'TEND') | (item == '')):
                    break

                if ((item != 'TEND') | (item != '')):
                    self.getEncrypt(item_dict)

                q.task_done()
            except Exception:
                break

    def getEncryptParallel(self, df_payload):
        start_pos = 0
        end_pos = 0
        l_dict = {}
        c_dict = {}
        min_val_list = {}
        cnt = 0
        num_worker_threads = self.num_worker_threads
        split_df = p.DataFrame()
        df_ret = p.DataFrame()

        # Assigning Target File Basic Name
        df_input = df_payload

        # Checking total count of rows
        count_row = df_input.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_threads) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            t = threading.Thread(target=self.getEncryptWQ)
            t.start()
            threads.append(t)
            name = str(t.getName())

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            split_df = df_input.iloc[start_pos:end_pos]
            l_dict[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            q.put(l_dict)
            cnt += 1

        # block until all tasks are done
        q.join()

        # stop workers
        for i in range(actual_worker_task):
            c_dict['TEND'] = p.DataFrame()
            q.put(c_dict)

        for t in threads:
            t.join()

        for k, v in fin_dict.items():
            min_val_list[int(k.replace('Thread-',''))] = v

        min_val = min(min_val_list, key=int)

        for k, v in sorted(fin_dict.items(), key=lambda k:int(k[0].replace('Thread-',''))):
            if int(k.replace('Thread-','')) == min_val:
                df_ret = fin_dict[k]
            else:
                d_frames = [df_ret, fin_dict[k]]
                df_ret = p.concat(d_frames)

        # Releasing Memory
        del[[split_df]]
        gc.collect()

        return df_ret

    def getDecrypt(self, df_encrypted_dict):
        try:
            df_input = p.DataFrame()
            df_fin = p.DataFrame()

            # Assigning Target File Basic Name
            for k, v in df_encrypted_dict.items():
                Thread_Name = k
                df_input = v

            # Checking total count of rows
            count_row = int(df_input.shape[0])

            if count_row > 0:

                # Deriving rows
                df_input['Decrypt_Acct_Nbr'] = df_input.apply(lambda row: self.decrypt_acctNbr(row), axis=1)
                df_input['Decrypt_Name'] = df_input.apply(lambda row: self.decrypt_Name(row), axis=1)
                df_input['Decrypt_Phone'] = df_input.apply(lambda row: self.decrypt_Phone(row), axis=1)
                df_input['Decrypt_Email'] = df_input.apply(lambda row: self.decrypt_Email(row), axis=1)

                # Dropping original columns
                df_input.drop(['Acct_Nbr', 'Name', 'Phone', 'Email'], axis=1, inplace=True)

                # Renaming new columns with the old column names
                df_input.rename(columns={'Decrypt_Acct_Nbr':'Acct_Nbr'}, inplace=True)
                df_input.rename(columns={'Decrypt_Name': 'Name'}, inplace=True)
                df_input.rename(columns={'Decrypt_Phone': 'Phone'}, inplace=True)
                df_input.rename(columns={'Decrypt_Email': 'Email'}, inplace=True)

                # New Column List Orders
                column_order = ['Acct_Nbr', 'Name', 'Acct_Addr_1', 'Acct_Addr_2', 'Phone', 'Email']
                df_fin = df_input.reindex(column_order, axis=1)

                fin_dict_1[Thread_Name] = df_fin

            return 0

        except Exception as e:
            df_error = p.DataFrame({'Acct_Nbr': str(e), 'Name': '', 'Acct_Addr_1': '', 'Acct_Addr_2': '', 'Phone': '', 'Email': ''})
            fin_dict_1[Thread_Name] = df_error

            return 1

    def getDecryptWQ(self):
        item_dict = {}
        item = ''

        while True:
            try:
                #item_dict = q.get()
                item_dict = m.get_nowait()

                for k, v in item_dict.items():
                    # Assigning Target File Basic Name
                    item = str(k)

                if ((item == 'TEND') | (item == '')):
                    return True
                    #break

                if ((item != 'TEND') | (item != '')):
                    self.getDecrypt(item_dict)

                m.task_done()
            except Exception:
                break


    def getDecryptParallel(self, df_payload):
        start_pos = 0
        end_pos = 0
        l_dict_1 = {}
        c_dict_1 = {}
        cnt = 0
        num_worker_threads = self.num_worker_threads
        split_df = p.DataFrame()
        df_ret_1 = p.DataFrame()

        min_val_list = {}

        # Assigning Target File Basic Name
        df_input_1 = df_payload

        # Checking total count of rows
        count_row = df_input_1.shape[0]
        print('Total number of records to process:: ', count_row)

        interval = int(count_row / num_worker_threads) + 1
        actual_worker_task = int(count_row / interval) + 1

        for i in range(actual_worker_task):
            t_1 = threading.Thread(target=self.getDecryptWQ)
            t_1.start()
            threads.append(t_1)
            name = str(t_1.getName())

            if ((start_pos + interval) < count_row):
                end_pos = start_pos + interval
            else:
                end_pos = start_pos + (count_row - start_pos)

            split_df = df_input_1.iloc[start_pos:end_pos]
            l_dict_1[name] = split_df

            if ((start_pos > count_row) | (start_pos == count_row)):
                break
            else:
                start_pos = start_pos + interval

            m.put(l_dict_1)
            cnt += 1

        # block until all tasks are done
        m.join()

        # stop workers
        for i in range(actual_worker_task):
            c_dict_1['TEND'] = p.DataFrame()
            m.put(c_dict_1)

        for t_1 in threads:
            t_1.join()

        for k, v in fin_dict_1.items():
            min_val_list[int(k.replace('Thread-',''))] = v

        min_val = min(min_val_list, key=int)

        for k, v in sorted(fin_dict_1.items(), key=lambda k:int(k[0].replace('Thread-',''))):
            if int(k.replace('Thread-','')) == min_val:
                df_ret_1 = fin_dict_1[k]
            else:
                d_frames = [df_ret_1, fin_dict_1[k]]
                df_ret_1 = p.concat(d_frames)

        # Releasing Memory
        del[[split_df]]
        gc.collect()

        return df_ret_1

Let’s explain the key snippet from the code. For your information, we’re not going to describe all the encryption methods such as –

# Encryption Method
encrypt_acctNbr

encrypt_Name
encrypt_Phone
encrypt_Email

# Decryption Method
decrypt_acctNbr
decrypt_Name
decrypt_Phone
decrypt_Email

As we’ve already described the logic of these methods in our previous post.

# Checking total count of rows
count_row = df_input.shape[0]
print('Total number of records to process:: ', count_row)

interval = int(count_row / num_worker_threads) + 1
actual_worker_task = int(count_row / interval) + 1

Fetching the total number of rows from the dataframe. Based on the row count, the application will derive the actual number of threads that will be used for parallelism.

for i in range(actual_worker_task):
    t = threading.Thread(target=self.getEncryptWQ)
    t.start()
    threads.append(t)
    name = str(t.getName())

    if ((start_pos + interval) < count_row):
        end_pos = start_pos + interval
    else:
        end_pos = start_pos + (count_row - start_pos)

    split_df = df_input.iloc[start_pos:end_pos]
    l_dict[name] = split_df

    if ((start_pos > count_row) | (start_pos == count_row)):
        break
    else:
        start_pos = start_pos + interval

    q.put(l_dict)
    cnt += 1

Here, the application is splitting the data into multiple groups of smaller data packs & then combining them into (key, value) dictionary & finally placed them into the individual queue.

# block until all tasks are done
q.join()

This will join the queue process. This will ensure that queues are free after consuming the data.

# stop workers
for i in range(actual_worker_task):
    c_dict['TEND'] = p.DataFrame()
    q.put(c_dict)

for t in threads:
    t.join()

The above lines are essential. As this will help the process to identify that no more data are left to send at the queue. And, the main thread will wait until all the threads are done.

for k, v in fin_dict.items():
    min_val_list[int(k.replace('Thread-',''))] = v

min_val = min(min_val_list, key=int)

Once, all the jobs are done. The application will find the minimum thread value & based on that we can sequence all the data chunks as explained in our previous image & finally clubbed them together to form the complete csv.

for k, v in sorted(fin_dict.items(), key=lambda k:int(k[0].replace('Thread-',''))):
    if int(k.replace('Thread-','')) == min_val:
        df_ret = fin_dict[k]
    else:
        d_frames = [df_ret, fin_dict[k]]
        df_ret = p.concat(d_frames)

As already explained, using the starting point of our data dictionary element, the application is clubbing the data back to the main csv.

Next method, which we’ll be explaining is –

getEncryptWQ

Please find the key lines –

while True:
    try:
        #item_dict = q.get()
        item_dict = q.get_nowait()

        for k, v in item_dict.items():
            # Assigning Target File Basic Name
            item = str(k)

        if ((item == 'TEND') | (item == '')):
            break

        if ((item != 'TEND') | (item != '')):
            self.getEncrypt(item_dict)

        q.task_done()
    except Exception:
        break

This method will consume the data & processing it for encryption or decryption. This will continue to do the work until or unless it receives the key value as TEND or the queue is empty.

Let’s compare the statistics between Windows & MAC.

Let’s see the file structure first –

Windows (16 GB – Core 2) Vs Mac (10 GB – Core 2):

Win_Vs_MAC

Windows (16 GB – Core 2):

Performance_Stats_Windows

Mac (10 GB – Core 2):

Performance_Stats_MAC

Find the complete directory from both the machine.
Windows (16 GB – Core 2):

Win_Files

Mac (10 GB – Core 2):

MAC_Files

Here is the final output –

Sample_OUTPut

So, we’ve achieved our target goal.

Let me know – how do you like this post. Please share your suggestion & comments.

I’ll be back with another installment from the Python verse.

Till then – Happy Avenging!

Pandas, Numpy, JSON & SSL (Crossover of Space Stone & Reality Stone in Python Verse)

In our last installment, we’ve shown pandas & numpy based on a specific situation. If that is our Space Stone installment of Python Verse, then this would be one approach of creating much interesting crossover of Space Stone & Reality Stone of Python verse. Yes. You are right. We’ll be discussing one requirement, where we need many of these in a single task.

Let’s dive into it!

Let’s assume that we have a source csv file which has the following data –

src_csv

Now, the requirement is – we need to use one third party web service to send JSON payload preparing with this data & send them to the 3rd party API to get the City, State & based on that we need to find the total number of item sold against each State & City.

requirement_data

Let’s look into our third-party API site  –

Please find the third-party API Link

website_api

As per the agreement with this website, any developer can test 10 calls per day free. After that, it will send your response with encrypted values, e.g. Classified. But, we don’t need more than 10 calls to test it.

Here, we’ll be dealing with the 4 python scripts. Among them, one scenario I’ve already described in my previous post. So, I’ll be just mentioning the file & post the script.

Please find the directory structure in both the OS –

directory_win_mac

1. clsLpy (This script will create the split csv files or final merge file after the corresponding process. However, this can be used as usual verbose debug logging as well. Hence, the name comes into the picture.)

 

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################
import pandas as p
import os
import platform as pl
from clsParam import clsParam as cf

class clsL(object):
    def __init__(self):
        self.path = cf.config['PATH']

    def logr(self, Filename, Ind, df, subdir=None):
        try:
            x = p.DataFrame()
            x = df
            sd = subdir

            os_det = pl.system()

            if sd == None:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + Filename
                else:
                    fullFileName = self.path + '/' + Filename
            else:
                if os_det == "Windows":
                    fullFileName = self.path + '\\' + sd + "\\" + Filename
                else:
                    fullFileName = self.path + '/' + sd + "/" + Filename

            if Ind == 'Y':
                x.to_csv(fullFileName, index=False)

            return 0

        except Exception as e:
            y = str(e)
            print(y)
            return 3

 

2. clsParam.py (This script contains the parameter entries in the form of dictionary & later this can be used in all the relevant python scripts as configuration parameters.)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import os

class clsParam(object):

    config = {
        'MAX_RETRY' : 5,
        'API_KEY' : 'HtWTVS86n8xoGXahyg1tPYH0HwngPqH2YFICzRCtlLbCtfNdya8L1UwRvH90PeMF',
        'PATH' : os.path.dirname(os.path.realpath(__file__)),
        'SUBDIR' : 'data'
    }

As you can see from this script that we’ve declared all the necessary parameters here as dictionary object & later we’ll be referring these parameters in the corresponding python scripts.

'API_KEY' : 'HtWTVS86n8xoGXahyg1tPYH0HwngPqH2YFICzRCtlLbCtfNdya8L1UwRvH90PeMF'

One crucial line, we’ll look into. API_KEY will be used while sending the JSON payload to the third-party web service. We’ll get this API_KEY from the highlighted (In Yellow) picture posted above.

3. clsWeb.py (This is the main script, which will first convert the pandas’ data frames into JSON & and send the API request as per the third party site.  It will capture the response & convert that by normalizing the data & poured it back to the data frame for further process.)

 

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
###########################################
#### Written By: SATYAKI DE        ########
#### Written On: 20-Jan-2019       ########
###########################################

import json
import requests
import datetime
import time
import ssl
from urllib.request import urlopen
import pandas as p
import numpy as np
import os
import gc
from clsParam import clsParam as cp

class clsWeb(object):
    def __init__(self, payload, format, unit):
        self.payload = payload
        self.path = cp.config['PATH']
        # To disable logging info
        self.max_retries = cp.config['MAX_RETRY']
        self.api_key = cp.config['API_KEY']
        self.unit = unit
        self.format =format

    def get_response(self):
        # Assigning Logging Info
        max_retries = self.max_retries
        api_key = self.api_key
        unit = self.unit
        format = self.format
        df_conv = p.DataFrame()
        cnt = 0

        try:
            # Bypassing SSL Authentication
            try:
                _create_unverified_https_context = ssl._create_unverified_context
            except AttributeError:
                # Legacy python that doesn't verify HTTPS certificates by default
                pass
            else:
                # Handle target environment that doesn't support HTTPS verification
                ssl._create_default_https_context = _create_unverified_https_context

            # Capturing the payload
            data_df = self.payload
            temp_df = data_df[['zipcode']]

            list_of_rec = temp_df['zipcode'].values.tolist()

            print(list_of_rec)

            for i in list_of_rec:
                zip = i

                # Providing the url
                url_part = 'http://www.zipcodeapi.com/rest/'
                url = url_part + api_key + '/' + 'info.' + format + '/' + str(zip) + '/' + unit

                headers = {"Content-type": "application/json"}
                param = headers

                var1 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch Start Time:', var1)

                retries = 1
                success = False

                while not success:
                    # Getting response from web service
                    response = requests.get(url, params=param, verify=False)
                    # print("Complete Error:: ", str(response.status_code))
                    # print("Error First::", str(response.status_code)[:1])

                    if str(response.status_code)[:1] == '2':
                        # response = s.post(url, params=param, json=json_data, verify=False)
                        success=True
                    else:
                        wait = retries * 2
                        print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
                        time.sleep(wait)
                        retries += 1

                    # Checking Maximum Retries
                    if retries == max_retries:
                        success=True
                        raise ValueError

                # print(response.text)

                var2 = datetime.datetime.now().strftime("%H:%M:%S")
                print('Json Fetch End Time:', var2)

                print("-" * 90)

                # Capturing the response json from Web Service
                df_response_json = response.text
                string_to_json = json.loads(df_response_json)

                # Converting the response json to Dataframe
                # df_Int_Rec = p.read_json(string_to_json, orient='records')
                df_Int_Rec = p.io.json.json_normalize(string_to_json)
                df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])

                if cnt == 0:
                    df_conv = df_Int_Rec
                else:
                    d_frames = [df_conv, df_Int_Rec]
                    df_conv = p.concat(d_frames)

                cnt += 1

            # Deleting temporary dataframes & Releasing memories
            del [[df_Int_Rec]]
            gc.collect()

            # Resetting the Index Value
            df_conv.reset_index(drop=True, inplace=True)

            # Merging two data side ways maintaining the orders
            df_add = p.concat([data_df, df_conv], axis=1)

            del [[df_conv]]
            gc.collect()

            # Dropping unwanted column
            df_add.drop(['acceptable_city_names'], axis=1, inplace=True)

            return df_add

        except ValueError as v:
            print(response.text)
            x = str(v)
            print(x)

            # Return Empty Dataframe
            df = p.DataFrame()
            return df

        except Exception as e:
            print(response.text)
            x = str(e)
            print(x)

            # Return Empty Dataframe
            df = p.DataFrame()
            return df

Let’s look at the key lines to discuss –

def __init__(self, payload, format, unit):

    self.payload = payload
    self.path = cp.config['PATH']

    # To disable logging info
    self.max_retries = cp.config['MAX_RETRY']
    self.api_key = cp.config['API_KEY']
    self.unit = unit
    self.format = format

The first block will be instantiated as soon as you are invoking the class. Note that, we’ve used our parameter class python script here as cp & then we’re referring the corresponding elements as & when requires. Other parameters will be captured from the invoking script, which we’ll be discussed later in this post.

# Bypassing SSL Authentication
try:
    _create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
    # Legacy python that doesn't verify HTTPS certificates by default
    pass
else:
    # Handle target environment that doesn't support HTTPS verification
    ssl._create_default_https_context = _create_unverified_https_context

Sometimes, Your Firewall or Proxy might block your web service request due to a specific certificate error. This snippet will bypass that authentication. However, it is always advised to use proper SSL certification in the Production environment.

# Capturing the payload
data_df = self.payload
temp_df = data_df[['zipcode']]

list_of_rec = temp_df['zipcode'].values.tolist()

In this snippet, we’re capturing the zip code from our source data frame & converting them into a list & this would be our candidate to pass the data as part of our JSON payload.

for i in list_of_rec:
    zip = i

    # Providing the url
    url_part = 'http://www.zipcodeapi.com/rest/'
    url = url_part + api_key + '/' + 'info.' + format + '/' + str(zip) + '/' + unit

    headers = {"Content-type": "application/json"}
    param = headers

Once, we’ve extracted our zip codes, we’re passing it one-by-one & forming our JSON with header & data.

retries = 1
success = False

while not success:
    # Getting response from web service
    response = requests.get(url, params=param, verify=False)
    # print("Complete Error:: ", str(response.status_code))
    # print("Error First::", str(response.status_code)[:1])

    if str(response.status_code)[:1] == '2':
        # response = s.post(url, params=param, json=json_data, verify=False)
        success=True
    else:
        wait = retries * 2
        print("Retry fails! Waiting " + str(wait) + " seconds and retrying.")
        time.sleep(wait)
        retries += 1

    # Checking Maximum Retries
    if retries == max_retries:
        success=True
        raise ValueError

In this section, we’re posting our JSON application & waiting for the response from the third-party API. If we receive the success response (200), we will proceed with the next zip code. However, if we didn’t receive the success response, we’ll retry the post option again until or unless it reaches the maximum limits. In case, if the application still waiting for a valid answer even after the maximum limit, it will exit from the loop & raise an error to the main application.

# Capturing the response json from Web Service
df_response_json = response.text
string_to_json = json.loads(df_response_json)

# Converting the response json to Dataframe
# df_Int_Rec = p.read_json(string_to_json, orient='records')
df_Int_Rec = p.io.json.json_normalize(string_to_json)
df_Int_Rec.columns = df_Int_Rec.columns.map(lambda x: x.split(".")[-1])

This snippet will extract the desired response from the API & convert that back to the Pandas data frame. Last two lines, it is normalizing the data that it has received from the API for further process. This is critical steps as these steps will lead to extract City & State from our API response.

# Merging two data side ways maintaining the orders
df_add = p.concat([data_df, df_conv], axis=1)

Once, we’ll have structured data – we can merge it back to our source data frame for our next step.

4. callWebservice.py (This script will call the API script & also process the data to create an aggregate report for our task.)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#####################################################
### Objective: Purpose of this Library is to call ###
### the Web Service method to capture the city,   ###
### & the state as a json response & update them  ###
### in the dataframe & finally produce the summary###
### of Total Sales & Item Counts based on the City###
### & the State.                                  ###
###                                               ###
### Arguments are as follows:                     ###
### Mentioned the Exception Part. First time dry  ###
### run the program without providing any args.   ###
### It will show all the mandatory params.        ###
###                                               ###
#####################################################
#####################################################
#### Written By: SATYAKI DE                       ###
#### Written On: 20-Jan-2019                      ###
#####################################################

import clsWeb as cw
import sys
import pandas as p
import os
import platform as pl
import clsLog as log
import datetime
import numpy as np
from clsParam import clsParam as cp

# Disbling Warnings
def warn(*args, **kwargs):
    pass
import warnings
warnings.warn = warn

def main():
    print("Calling the custom Package..")

    try:
        if len(sys.argv) == 4:
            inputFile = str(sys.argv[1])
            format = str(sys.argv[2])
            unit = str(sys.argv[3])
        else:
            raise Exception

        # Checking whether the format contains
        # allowable choices or not
        if (format == 'JSON'):
            format = 'json'
        elif (format == 'CSV'):
            format = 'csv'
        elif (format == 'XML'):
            format = 'xml'
        else:
            raise Exception

        # Checking whether the format contains
        # allowable choices or not
        if (unit == 'DEGREE'):
            unit = 'degree'
        elif (unit == 'RADIANS'):
            unit = 'radians'
        else:
            raise Exception

        print("*" * 170)
        print("Reading from " + str(inputFile))
        print("*" * 170)

        # Initiating Logging Instances
        clog = log.clsLog()

        path = cp.config['PATH']
        subdir = cp.config['SUBDIR']

        os_det = pl.system()

        if os_det == "Windows":
            src_path = path + '\\' + 'data\\'
        else:
            src_path = path + '/' + 'data/'

        # Reading source data csv file
        df_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)

        x = cw.clsWeb(df_Payload, format, unit)
        retDf = x.get_response()

        # Total Number of rows fetched
        count_row = retDf.shape[0]

        if count_row == 0:
            print("Data Processing Issue!")
        else:
            print("Writing to file -> (" + str(inputFile) + "_modified.csv) Status: Success")

        FileName, FileExtn = inputFile.split(".")

        # Writing to the file
        clog.logr(FileName + '_modified.' + FileExtn, 'Y', retDf, subdir)
        print("*" * 170)

        # Performing group by operation to get the desired result
        # State & City-wise total Sales & Item Sales
        df_src = p.DataFrame()
        df_src = retDf[['city', 'state', 'total', 'item_count']]

        # Converting values to Integer
        df_src['city_1'] = retDf['city'].astype(str)
        df_src['state_1'] = retDf['state'].astype(str)
        df_src['total_1'] = retDf['total'].astype(int)
        df_src['item_count_1'] = retDf['item_count'].astype(int)

        # Dropping the old Dtype Columns
        df_src.drop(['city'], axis=1, inplace=True)
        df_src.drop(['state'], axis=1, inplace=True)
        df_src.drop(['total'], axis=1, inplace=True)
        df_src.drop(['item_count'], axis=1, inplace=True)

        # Renaming the new columns to as per Old Column Name
        df_src.rename(columns={'city_1': 'city'}, inplace=True)
        df_src.rename(columns={'state_1': 'state'}, inplace=True)
        df_src.rename(columns={'total_1': 'total'}, inplace=True)
        df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)

        # Performing Group By Operation
        grouped = df_src.groupby(['state', 'city'])
        res_1 = grouped.aggregate(np.sum)

        print("DF:")
        print(res_1)

        FileName1 = 'StateCityWiseReport'
        # Handling Multiple source files
        var = datetime.datetime.now().strftime(".%H.%M.%S")
        print('Target File Extension will contain the following:: ', var)

        # Writing to the file
        clog.logr(FileName1 + var + '.' + FileExtn, 'Y', df_src, subdir)

        print("*" * 170)
        print("Operation done for " + str(inputFile) + "!")
        print("*" * 170)
    except Exception as e:
        x = str(e)
        print(x)
        print("*" * 170)
        print('Current order would be - <' + str(sys.argv[0]) + '> <Csv File Name> <JSON/CSV/XML> <DEGREE/RADIANS>')
        print('Make sure last two params should be in CAPS only!')
        print("*" * 170)

if __name__ == "__main__":
    main()

Let’s look at some vital code snippet in this main script –

# Reading source data csv file
df_Payload = p.read_csv(src_path+inputFile, index_col=False, skipinitialspace=True)

x = cw.clsWeb(df_Payload, format, unit)
retDf = x.get_response()

In this snippet, we’re getting our data from our source csv & then calling our leading Web API service to get the State & City information.

# Converting values to Integer
df_src['city_1'] = retDf['city'].astype(str)
df_src['state_1'] = retDf['state'].astype(str)
df_src['total_1'] = retDf['total'].astype(int)
df_src['item_count_1'] = retDf['item_count'].astype(int)

Converting individual data type to appropriate data types. In Pandas, it is always advisable to change the data type of frames to avoid unforeseen scenarios.

# Dropping the old Dtype Columns
df_src.drop(['city'], axis=1, inplace=True)
df_src.drop(['state'], axis=1, inplace=True)
df_src.drop(['total'], axis=1, inplace=True)
df_src.drop(['item_count'], axis=1, inplace=True)

# Renaming the new columns to as per Old Column Name
df_src.rename(columns={'city_1': 'city'}, inplace=True)
df_src.rename(columns={'state_1': 'state'}, inplace=True)
df_src.rename(columns={'total_1': 'total'}, inplace=True)
df_src.rename(columns={'item_count_1': 'item_count'}, inplace=True)

Now, dropping the old columns & renaming the new columns to get the same column with correct data types. I personally like this way as it is an immaculate way to do this task. You can also debug it easily.

# Performing Group By Operation
grouped = df_src.groupby(['state', 'city'])
res_1 = grouped.aggregate(np.sum)

And, finally, using Pandas group-by method we’re aggregating the groups & then using numpy to generate the same against each group.

Please check the first consolidated output –

consolidate_data_from_web_api

From this screenshot, you can see how we have the desired intermediate data of City & State to proceed for the next level.

Let’s see how it runs –

Windows (64 bit):

run_windows

Mac (32 bit):

run_mac

So, from the screenshot, we can see our desired output & you can calculate the aggregated value based on our sample provided in the previous screenshot.

Let’s check how the data directory looks like after run –

Windows:

final_data_windows

MAC:

final_data_mac

So, finally, we’ve achieved our target.

I hope this will give you some more idea about more insights into the Python verse. Let me know – how do you think about this post.

Till then – Happy Avenging!