Exploring the new Polars library in Python

Today, I will present some valid Python packages where you can explore most of the complex SQLs by using this new package named “Polars,” which can be extremely handy on many occasions.

This post will be short posts where I’ll prepare something new on LLMs for the upcoming posts for the next month.

Why not view the demo before going through it?


Demo
pip install polars
pip install pandas

Let us understand the key class & snippets.

  • clsConfigClient.py (Key entries that will be discussed later)
################################################
#### Written By: SATYAKI DE                 ####
#### Written On:  15-May-2020               ####
#### Modified On: 28-Oct-2023               ####
####                                        ####
#### Objective: This script is a config     ####
#### file, contains all the keys for        ####
#### personal OpenAI-based MAC-shortcuts    ####
#### enable bot.                            ####
####                                        ####
################################################

import os
import platform as pl

class clsConfigClient(object):
    Curr_Path = os.path.dirname(os.path.realpath(__file__))

    os_det = pl.system()
    if os_det == "Windows":
        sep = '\\'
    else:
        sep = '/'

    conf = {
        'APP_ID': 1,
        'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
        'LOG_PATH': Curr_Path + sep + 'log' + sep,
        'DATA_PATH': Curr_Path + sep + 'data' + sep,
        'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
        'OUTPUT_DIR': 'model',
        'APP_DESC_1': 'Polars Demo!',
        'DEBUG_IND': 'Y',
        'INIT_PATH': Curr_Path,
        'TITLE': "Polars Demo!",
        'PATH' : Curr_Path,
        'OUT_DIR': 'data',
        'MERGED_FILE': 'mergedFile.csv',
        'ACCT_FILE': 'AccountAddress.csv',
        'ORDER_FILE': 'Orders.csv',
        'CUSTOMER_FILE': 'CustomerDetails.csv',
        'STATE_CITY_WISE_REPORT_FILE': 'StateCityWiseReport.csv'
    }
  • clsSQL.py (Main class file that contains how to use the SQL)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-Oct-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### Polar class, which will enable SQL          ####
#### capabilitites.                              ####
####                                             ####
#####################################################

import polars as pl
import os
from clsConfigClient import clsConfigClient as cf
import pandas as p

###############################################
###           Global Section                ###
###############################################

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsSQL:
    def __init__(self):
        self.acctFile = cf.conf['ACCT_FILE']
        self.orderFile = cf.conf['ORDER_FILE']
        self.stateWiseReport = cf.conf['STATE_CITY_WISE_REPORT_FILE']
        self.custFile = cf.conf['CUSTOMER_FILE']
        self.dataPath = cf.conf['DATA_PATH']

    def execSQL(self):
        try:
            dataPath = self.dataPath
            acctFile = self.acctFile
            orderFile = self.orderFile
            stateWiseReport = self.stateWiseReport
            custFile = self.custFile

            fullAcctFile = dataPath + acctFile
            fullOrderFile = dataPath + orderFile
            fullStateWiseReportFile = dataPath + stateWiseReport
            fullCustomerFile = dataPath + custFile

            ctx = pl.SQLContext(accountMaster = pl.scan_csv(fullAcctFile),
            orderMaster = pl.scan_csv(fullOrderFile),
            stateMaster = pl.scan_csv(fullStateWiseReportFile))

            querySQL = """
            SELECT orderMaster.order_id,
            orderMaster.total,
            stateMaster.state,
            accountMaster.Acct_Nbr,
            accountMaster.Name,
            accountMaster.Email,
            accountMaster.user_id,
            COUNT(*) TotalCount
            FROM orderMaster
            JOIN stateMaster USING (city)
            JOIN accountMaster USING (user_id)
            ORDER BY stateMaster.state
            """

            res = ctx.execute(querySQL, eager=True)
            res_Pandas = res.to_pandas()

            print('Result:')
            print(res_Pandas)
            print(type(res_Pandas))

            ctx_1 = pl.SQLContext(customerMaster = pl.scan_csv(fullCustomerFile),
            tempMaster=pl.from_pandas(res_Pandas))

            querySQL_1 = """
            SELECT tempMaster.order_id,
            tempMaster.total,
            tempMaster.state,
            tempMaster.Acct_Nbr,
            tempMaster.Name,
            tempMaster.Email,
            tempMaster.TotalCount,
            tempMaster.user_id,
            COUNT(*) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) StateWiseCount,
            MAX(tempMaster.Acct_Nbr) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) MaxAccountByState,
            MIN(tempMaster.Acct_Nbr) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) MinAccountByState,
            CASE WHEN tempMaster.total < 70 THEN 'SILVER' ELSE 'GOLD' END CategoryStat,
            SUM(customerMaster.Balance) OVER(PARTITION BY tempMaster.state) SumBalance
            FROM tempMaster
            JOIN customerMaster USING (user_id)
            ORDER BY tempMaster.state
            """

            res_1 = ctx_1.execute(querySQL_1, eager=True)

            finDF = res_1.to_pandas()

            print('Result 2:')
            print(finDF)

            return 0
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return 1

If we go through some of the key lines, we will understand how this entire package works.

But, before that, let us understand the source data –

Let us understand the steps –

  1. Join orderMaster, stateMaster & accountMaster and fetch the selected attributes. Store this in a temporary data frame named tempMaster.
  2. Join tempMaster & customerMaster and fetch the relevant attributes with some more aggregation, which is required for the business KPIs.
ctx = pl.SQLContext(accountMaster = pl.scan_csv(fullAcctFile),
orderMaster = pl.scan_csv(fullOrderFile),
stateMaster = pl.scan_csv(fullStateWiseReportFile))

The above method will create three temporary tables by reading the source files – AccountAddress.csv, Orders.csv & StateCityWiseReport.csv.

And, let us understand the supported SQLs –

SELECT  orderMaster.order_id,
        orderMaster.total,
        stateMaster.state,
        accountMaster.Acct_Nbr,
        accountMaster.Name,
        accountMaster.Email,
        accountMaster.user_id,
        COUNT(*) TotalCount
FROM orderMaster
JOIN stateMaster USING (city)
JOIN accountMaster USING (user_id)
ORDER BY stateMaster.state

In this step, we’re going to store the output of the above query into a temporary view named – tempMaster data frame.

Since this is a polar data frame, we’re converting it to the pandas data frame.

res_Pandas = res.to_pandas()

Finally, let us understand the next part –

ctx_1 = pl.SQLContext(customerMaster = pl.scan_csv(fullCustomerFile),
tempMaster=pl.from_pandas(res_Pandas))

In the above section, one source is getting populated from the CSV file, whereas the other source is feeding from a pandas data frame populated in the previous step.

Now, let us understand the SQL supported by this package, which is impressive –

SELECT  tempMaster.order_id,
        tempMaster.total,
        tempMaster.state,
        tempMaster.Acct_Nbr,
        tempMaster.Name,
        tempMaster.Email,
        tempMaster.TotalCount,
        tempMaster.user_id,
        COUNT(*) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) StateWiseCount,
        MAX(tempMaster.Acct_Nbr) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) MaxAccountByState,
        MIN(tempMaster.Acct_Nbr) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) MinAccountByState,
        CASE WHEN tempMaster.total < 70 THEN 'SILVER' ELSE 'GOLD' END CategoryStat,
        SUM(customerMaster.Balance) OVER(PARTITION BY tempMaster.state) SumBalance
FROM tempMaster
JOIN customerMaster USING (user_id)
ORDER BY tempMaster.state

As you can see it has the capability of all the advanced analytics SQL using partitions, and CASE statements.

The only problem with COUNT(*) with the partition is not working as expected. Not sure, whether that is related to any version issues or not.

COUNT(*) OVER(PARTITION BY tempMaster.state ORDER BY tempMaster.state, tempMaster.Acct_Nbr) StateWiseCount

I’m trying to get more information on this. Except for this statement, everything works perfectly.

  • 1_testSQL.py (Main class file that contains how to use the SQL)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Oct-2023                         ####
####                                                 ####
#### Objective: This is the main class that invokes  ####
#### advanced analytic SQL in python.                ####
####                                                 ####
#########################################################

from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsSQL as ccl

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsSQL()

var = datetime.now().strftime(".%H.%M.%S")

documents = []

###############################################
###    End of Global Section                ###
###############################################
def main():
    try:
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        r1 = cl.execSQL()

        if r1 == 0:
            print()
            print('Successfully SQL-enabled!')
        else:
            print()
            print('Failed to senable SQL!')

        print('*'*120)
        var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

As this is extremely easy to understand & self-explanatory.

To learn more about this package, please visit the following link.


So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging!  🙂

Enable OpenAI chatbot with the selected YouTube video content using LangChain, FAISS & YouTube data-API.

Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.

How will it help?

Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.


What is LangChain?

LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:

  1. Data-aware: connect a language model to other sources of data
  2. Agentic: allow a language model to interact with its environment

The LangChain framework works around these principles.

To know more about this, please click the following link.

As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.


What is FAISS?

Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.

Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.

To know more about this, please click the following link.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

Here are the steps that will follow in sequence –

  • The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
  • Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
  • Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based video content ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_CNT' : 5,
'OUT_DIR': 'data'
}

Some of the key entries from the above scripts are as follows –

'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TEMP_VAL': 0.2,

From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.

  • clsTemplate.py (Contains all the templates for OpenAI.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the template for ####
#### OpenAI prompts to get the correct ####
#### response. ####
#### ####
################################################
# Template to use for the system message prompt
templateVal_1 = """
You are a helpful assistant that that can answer questions about youtube videos
based on the video's transcript: {docs}
Only use the factual information from the transcript to answer the question.
If you feel like you don't have enough information to answer the question, say "I don't know".
Your answers should be verbose and detailed.
"""

view raw

clsTemplate.py

hosted with ❤ by GitHub

The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.

  • clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### LangChain of package to extract ####
#### the transcript from the YouTube videos & ####
#### then answer the questions based on the ####
#### topics selected by the users. ####
#### ####
#####################################################
from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from googleapiclient.discovery import build
import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf
import os
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsVideoContentScrapper:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.temp_val = cf.conf['TEMP_VAL']
self.max_cnt = int(cf.conf['MAX_CNT'])
def createDBFromYoutubeVideoUrl(self, video_url):
try:
loader = YoutubeLoader.from_youtube_url(video_url)
transcript = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.split_documents(transcript)
db = FAISS.from_documents(docs, embeddings)
return db
except Exception as e:
x = str(e)
print('Error: ', x)
return ''
def getResponseFromQuery(self, db, query, k=4):
try:
"""
gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
the number of tokens to analyze.
"""
mod_name = self.model_name
temp_val = self.temp_val
docs = db.similarity_search(query, k=k)
docs_page_content = " ".join([d.page_content for d in docs])
chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
# Template to use for the system message prompt
template = ct.templateVal_1
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
# Human question prompt
human_template = "Answer the following question: {question}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
chat_prompt = ChatPromptTemplate.from_messages(
[system_message_prompt, human_message_prompt]
)
chain = LLMChain(llm=chat, prompt=chat_prompt)
response = chain.run(question=query, docs=docs_page_content)
response = response.replace("\n", "")
return response, docs
except Exception as e:
x = str(e)
print('Error: ', x)
return '', ''
def topFiveURLFromYouTube(self, service, **kwargs):
try:
video_urls = []
channel_list = []
results = service.search().list(**kwargs).execute()
for item in results['items']:
print("Title: ", item['snippet']['title'])
print("Description: ", item['snippet']['description'])
channel = item['snippet']['channelId']
print("Channel Id: ", channel)
# Fetch the channel name using the channel ID
channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
channel_title = channel_response['items'][0]['snippet']['title']
print("Channel Title: ", channel_title)
channel_list.append(channel_title)
print("Video Id: ", item['id']['videoId'])
vidURL = "https://www.youtube.com/watch?v=&quot; + item['id']['videoId']
print("Video URL: " + vidURL)
video_urls.append(vidURL)
print("\n")
return video_urls, channel_list
except Exception as e:
video_urls = []
channel_list = []
x = str(e)
print('Error: ', x)
return video_urls, channel_list
def extractContentInText(self, topic, query):
try:
discussedTopic = []
strKeyText = ''
cnt = 0
max_cnt = self.max_cnt
urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
print('Returned List: ')
print(urlList)
print()
for video_url in urlList:
print('Processing Video: ')
print(video_url)
db = self.createDBFromYoutubeVideoUrl(video_url)
response, docs = self.getResponseFromQuery(db, query)
if len(response) > 0:
strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
discussedTopic.append(strKeyText + response)
cnt += 1
return discussedTopic
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
return discussedTopic

Let us understand the key methods step by step in detail –

def topFiveURLFromYouTube(self, service, **kwargs):
    try:
        video_urls = []
        channel_list = []
        results = service.search().list(**kwargs).execute()

        for item in results['items']:
            print("Title: ", item['snippet']['title'])
            print("Description: ", item['snippet']['description'])
            channel = item['snippet']['channelId']
            print("Channel Id: ", channel)

            # Fetch the channel name using the channel ID
            channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
            channel_title = channel_response['items'][0]['snippet']['title']
            print("Channel Title: ", channel_title)
            channel_list.append(channel_title)

            print("Video Id: ", item['id']['videoId'])
            vidURL = "https://www.youtube.com/watch?v=" + item['id']['videoId']
            print("Video URL: " + vidURL)
            video_urls.append(vidURL)
            print("\n")

        return video_urls, channel_list

    except Exception as e:
        video_urls = []
        channel_list = []
        x = str(e)
        print('Error: ', x)

        return video_urls, channel_list

The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.

def createDBFromYoutubeVideoUrl(self, video_url):
    try:
        loader = YoutubeLoader.from_youtube_url(video_url)
        transcript = loader.load()

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        docs = text_splitter.split_documents(transcript)

        db = FAISS.from_documents(docs, embeddings)
        return db

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return ''

The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:

  1. The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
  2. The function uses a try-except block to handle any potential exceptions or errors that may occur.
  3. Inside the try block, the following steps are going to perform:
  • First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
  • The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
  • It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
  • The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
  • The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
  • Finally, the db variable is returned, representing the created database from the video transcript.

4. If an exception occurs during the execution of the try block, the code execution moves to the except block:

  • Here, it first converts the exception e to a string x.
  • Then it prints an error message.
  • Finally, it returns an empty string as an indication of the error.

def getResponseFromQuery(self, db, query, k=4):
      try:
          """
          gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
          the number of tokens to analyze.
          """

          mod_name = self.model_name
          temp_val = self.temp_val

          docs = db.similarity_search(query, k=k)
          docs_page_content = " ".join([d.page_content for d in docs])

          chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)

          # Template to use for the system message prompt
          template = ct.templateVal_1

          system_message_prompt = SystemMessagePromptTemplate.from_template(template)

          # Human question prompt
          human_template = "Answer the following question: {question}"
          human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

          chat_prompt = ChatPromptTemplate.from_messages(
              [system_message_prompt, human_message_prompt]
          )

          chain = LLMChain(llm=chat, prompt=chat_prompt)

          response = chain.run(question=query, docs=docs_page_content)
          response = response.replace("\n", "")
          return response, docs

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          return '', ''

The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:

  1. The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
  2. The function initiates a try-except block for handling any errors that might occur.
  3. Inside the try block:
  • The function retrieves the model name and temperature value from the instance of the class this function is a part of.
  • The function then searches the db database for documents similar to the query and saves these in docs.
  • It concatenates the content of the returned documents into a single string docs_page_content.
  • It creates a ChatOpenAI object with the model name and temperature value.
  • It creates a system message prompt from a predefined template.
  • It creates a human message prompt, which is the query.
  • It combines these two prompts to form a chat prompt.
  • An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
  • This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
  • Finally, the function returns this response along with the original documents.
  1. If any error occurs during these operations, the function goes to the except block where:
  • The error message is printed.
  • The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.

def extractContentInText(self, topic, query):
    try:
        discussedTopic = []
        strKeyText = ''
        cnt = 0
        max_cnt = self.max_cnt

        urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
        print('Returned List: ')
        print(urlList)
        print()

        for video_url in urlList:
            print('Processing Video: ')
            print(video_url)
            db = self.createDBFromYoutubeVideoUrl(video_url)

            response, docs = self.getResponseFromQuery(db, query)

            if len(response) > 0:
                strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
                discussedTopic.append(strKeyText + response)

            cnt += 1

        return discussedTopic
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)

        return discussedTopic

This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:

  1. The function extractContentInText is defined with topic and query as parameters.
  2. It begins with a try-except block to catch and handle any possible exceptions.
  3. In the try block:
  • It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
  • It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
  • It prints the returned list of URLs.
  • Then, it starts a loop over each URL in the urlList.
    • For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
    • It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
    • If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
    • It increments the counter cnt by one after each iteration.
    • Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
  1. If any error occurs during these operations, the function goes into the except block:
  • It first resets discussedTopic to an empty list.
  • Then it converts the exception e to a string and prints the error message.
  • Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
  • testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoContentScrapper class to extract ####
#### the transcript from the YouTube videos. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import textwrap
import clsVideoContentScrapper as cvsc
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
cVCScrapper = cvsc.clsVideoContentScrapper()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
#query = "What are they saying about Microsoft?"
print('Please share your topic!')
inputTopic = input('User: ')
print('Please ask your questions?')
inputQry = input('User: ')
print()
retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
cnt = 0
for discussedTopic in retList:
finText = str(cnt + 1) + ') ' + discussedTopic
print()
print(textwrap.fill(finText, width=150))
cnt += 1
r1 = len(retList)
if r1 > 0:
print()
print('Successfully Scrapped!')
else:
print()
print('Failed to Scrappe!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Please find the key snippet –

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        #query = "What are they saying about Microsoft?"
        print('Please share your topic!')
        inputTopic = input('User: ')
        print('Please ask your questions?')
        inputQry = input('User: ')
        print()

        retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
        cnt = 0

        for discussedTopic in retList:
            finText = str(cnt + 1) + ') ' + discussedTopic
            print()
            print(textwrap.fill(finText, width=150))

            cnt += 1

        r1 = len(retList)

        if r1 > 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == "__main__":
    main()

The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.

USAGE & COST FACTOR:

Please find the OpenAI usage –

Please find the YouTube API usage –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.

Demonstration of GPT-3 model tuning using Python for an upcoming PyPi-package

Today, I’m very excited to demonstrate an effortless & new way to fine-tune the GPT-3 model using Python with the help of my new build (unpublished) PyPi package. In this post, I plan to deal with the custom website link as a response from this website depending upon the user queries with the help of the OpenAI-based tuned model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it exciting? Finally, we can efficiently handle your custom website URL using OpenAI tuned model.


What is ChatGPT?

ChatGPT is an advanced artificial intelligence language model developed by OpenAI based on the GPT-4 architecture. As an AI model, it is designed to understand and generate human-like text-based on the input it receives. ChatGPT can engage in various tasks, such as answering questions, providing recommendations, creating content, and simulating conversation. While it is highly advanced and versatile, it’s important to note that ChatGPT’s knowledge is limited to the data it was trained on, with a cutoff date of September 2021.

When to tune GPT model?

Tuning a GPT or any AI model might be necessary for various reasons. Here are some common scenarios when you should consider adjusting or fine-tuning a GPT model:

  1. Domain-specific knowledge: If you need your model to have a deeper understanding of a specific domain or industry, you can fine-tune it with domain-specific data to improve its performance.
  2. New or updated data: If new or updated information is not part of the original training data, you should fine-tune the model to ensure it has the most accurate and up-to-date knowledge.
  3. Customization: If you require the model to have a specific style, tone, or focus, you can fine-tune it with data that reflects those characteristics.
  4. Ethical or safety considerations: To make the model safer and more aligned with human values, you should fine-tune it to reduce biased or harmful outputs.
  5. Improve performance: If the base model’s performance is unsatisfactory for a particular task or application, you can fine-tune it on a dataset more relevant to the job, often leading to better results.

Remember that tuning or fine-tuning a GPT model requires access to appropriate data and computational resources and an understanding of the model’s architecture and training techniques. Additionally, monitoring and evaluating the model’s performance after fine-tuning is essential to ensure that the desired improvements have been achieved.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

The initial Python-based client interacts with the tuned OpenAI models. This process enables it to get a precise response with custom data in a very convenient way. So that anyone can understand.


SOURCE DATA:

Let us understand how to feed the source data as it will deal with your website URL link.

The first data that we are going to talk about is the one that contains the hyperlink. Let us explore the sample here.

From the above diagram, one can easily understand that the application will interpret a unique hash number associated with a specific URL. This data will be used to look up the URL after the OpenAI response from the tuned model as a result of any user query.

Now, let us understand the actual source data.

If we closely check, we’ll see the source file contains two columns – prompt & completion. And the website reference is put inside the curly braces as shown – “{Hash Code that represents your URL}.”

During the response, the newly created library replaces the hash value with the correct URL after the successful lookup & presents the complete answer.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 21-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### OpenAI fine-tune projects. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'ChatGPT Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'TEMP_FILE_NAME': 'chatGPTData.jsonl',
'TITLE': "GPT-3 Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/&#39;,
'EPOCH': 10,
'SUFFIX': 'py-saty',
'EXIT_KEYWORD': 'bye'
}

Some of the important entries that will require later are as follows –

'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/',
'EXIT_KEYWORD': 'bye'

We’ll discuss these entries later.

  • trainChatGPTModel.py (This is the main calling Python script that will invoke the newly created fine-tune GPT-3 enabler.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune GPT-3 enabler. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
#tModel = tm.clsTrainModel()
tModel = tm.clsTrainModel3()
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
#clog.logr(OutPutFileName, debug_ind, df, subdir)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Following are the key snippet from the above script –

data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']

And, then –

tModel = tm.clsTrainModel3()
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)

As one can see, the package needs only the source data file to fine-tune GPT-3 model.

  • checkFineTuneChatGPTModelStat.py (This is the main Python script that will check the status of the tuned process that will happen inside the OpenAI-cloud environment.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune job status inside ####
#### the OpenAI environment. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
# Example usage
input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)
r1 = tmodel.checkStat(url, open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

To check the status of the fine-tuned job inside the OpenAI environment, one needs to provide the fine tune id, which generally starts with -> “ft-*.” One would get this value after the train script’s successful run.

Some of the other key snippets are –

tmodel = tm.clsTestModel3()

url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']

And, then –

input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)

r1 = tmodel.checkStat(url, open_api_key)

The above snippet is self-explanatory as one is passing the fine tune id along with the OpenAI API key.

  • testChatGPTModel.py (This is the main testing Python script that will invoke the newly created fine-tune GPT-3 enabler to get a response with custom data.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 19-Apr-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created class that will test the ####
#### tuned model output. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import pandas as p
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)
if r1 == 0:
print('Successfully tested the tuned GPT-3 model.')
else:
print('Failed to test the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key entries from the above snippet are as follows –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']

And, then –

LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)

In the above lines, the application gets the correct URL value from the look file we’ve prepared for this specific use case.

  • deleteChatGPTModel.py (This is the main Python script that will delete the old intended tuned model, which is no longer needed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 21-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created delete model methods for ####
#### OpenAI. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
r1 = tmodel.delOldModel(open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key snippets from the above scripts are –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']

And, then –

r1 = tmodel.delOldModel(open_api_key)

We’ve demonstrated that using a straightforward method, one can delete any old tuned model from OpenAI that is no longer required.

KEY FEATURES TO CONSIDER DURING TUNING:

  • Data quality: Ensure that the data used for fine-tuning is clean, relevant, and representative of the domain you want the model to understand. Check for biases, inconsistencies, and errors in the dataset.
  • Overfitting: Be cautious of overfitting, which occurs when the model performs exceptionally well on the training data but poorly on unseen data. You can address overfitting by using regularization techniques, early stopping, or cross-validation.
  • Model size and resource requirements: GPT models can be resource-intensive. Be mindful of the hardware limitations and computational resources available when selecting the model size and the time and cost associated with training.
  • Hyperparameter tuning: Select appropriate hyperparameters for your fine-tuning processes, such as learning rate, batch size, and the number of epochs. Experiment with different combinations to achieve the best results without overfitting.
  • Evaluation metrics: Choose suitable evaluation metrics to assess the performance of your fine-tuned model. Consider using multiple metrics to understand your model’s performance comprehensively.
  • Ethical considerations: Be aware of potential biases in your dataset and how the model’s predictions might impact users. Address ethical concerns during the fine-tuning process and consider using techniques such as data augmentation or adversarial training to mitigate these biases.
  • Monitoring and maintenance: Continuously monitor the model’s performance after deployment, and be prepared to re-tune or update it as needed. Regular maintenance ensures that the model remains relevant and accurate.
  • Documentation: Document your tuning process, including the data used, model architecture, hyperparameters, and evaluation metrics. This factor will facilitate easier collaboration, replication, and model maintenance.
  • Cost: OpenAI fine-tuning can be extremely expensive, even for a small volume of data. Hence, organization-wise, one needs to be extremely careful while using this feature.

COST FACTOR:

Before we discuss the actual spending, let us understand the tested data volume to train & tune the model.

So, we’re talking about a total size of 500 KB (at max). And, we did 10 epochs during the training as you can see from the config file mentioned above.

So, it is pretty expensive. Use it wisely.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Tuning your model using the python-based low-code machine-learning library PyCaret

Today, I’ll discuss another important topic before I will share the excellent use case next month, as I still need some time to finish that one. We’ll see how we can leverage the brilliant capability of a low-code machine-learning library named PyCaret.

But before going through the details, why don’t we view the demo & then go through it?

Demo

Architecture:

Let us understand the flow of events –

As one can see, the initial training requests are triggered from the PyCaret-driven training models. And the application can successfully process & identify the best models out of the other combinations.

Python Packages:

Following are the python packages that are necessary to develop this use case –

pip install pandas
pip install pycaret

PyCaret is dependent on a combination of other popular python packages. So, you need to install them successfully to run this package.

CODE:

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Mar-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'PyCaret Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Titanic.csv',
'MODEL_NAME': 'PyCaret-ft-personal-2023-03-31-04-29-53',
'TITLE': "PyCaret Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data'
}

I’m skipping this section as it is self-explanatory.


  • clsTrainModel.py (This is the main class that contains the core logic of low-code machine-learning library to evaluate the best model for your solutions.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main class that ####
#### contains the core logic of low-code ####
#### machine-learning library to evaluate the ####
#### best model for your solutions. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Import necessary libraries
import pandas as p
from pycaret.classification import *
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
###############################################
### End of Global Section ###
###############################################
class clsTrainModel:
def __init__(self):
self.model_path = cf.conf['MODEL_PATH']
self.model_name = cf.conf['MODEL_NAME']
def trainModel(self, FullFileName):
try:
df = p.read_csv(FullFileName)
row_count = int(df.shape[0])
print('Number of rows: ', str(row_count))
print(df)
# Initialize the setup in PyCaret
clf_setup = setup(
data=df,
target="Survived",
train_size=0.8, # 80% for training, 20% for testing
categorical_features=["Sex", "Embarked"],
ordinal_features={"Pclass": ["1", "2", "3"]},
ignore_features=["Name", "Ticket", "Cabin", "PassengerId"],
#silent=True, # Set to False for interactive setup
)
# Compare various models
best_model = compare_models()
# Create a specific model (e.g., Random Forest)
rf_model = create_model("rf")
# Hyperparameter tuning
tuned_rf_model = tune_model(rf_model)
# Evaluate model performance
plot_model(tuned_rf_model, plot="confusion_matrix")
plot_model(tuned_rf_model, plot="auc")
# Finalize the model (train on the complete dataset)
final_rf_model = finalize_model(tuned_rf_model)
# Make predictions on new data
new_data = df.drop("Survived", axis=1)
predictions = predict_model(final_rf_model, data=new_data)
# Writing into the Model
FullModelName = self.model_path + self.model_name
print('Model Output @:: ', str(FullModelName))
print()
# Save the fine-tuned model
save_model(final_rf_model, FullModelName)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand the code in simple terms –

  1. Import necessary libraries and load the Titanic dataset.
  2. Initialize the PyCaret setup, specifying the target variable, train-test split, categorical and ordinal features, and features to ignore.
  3. Compare various models to find the best-performing one.
  4. Create a specific model (Random Forest in this case).
  5. Perform hyper-parameter tuning on the Random Forest model.
  6. Evaluate the model’s performance using a confusion matrix and AUC-ROC curve.
  7. Finalize the model by training it on the complete dataset.
  8. Make predictions on new data.
  9. Save the trained model for future use.

  • trainPYCARETModel.py (This is the main calling python script that will invoke the training class of PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### training class of Pycaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
tModel = tm.clsTrainModel()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above code is pretty self-explanatory as well.


  • testPYCARETModel.py (This is the main calling python script that will invoke the testing script for PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### testing script for PyCaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
from pycaret.classification import load_model, predict_model
import pandas as p
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
model_path = cf.conf['MODEL_PATH']
model_name = cf.conf['MODEL_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = model_path + model_name
# Load the saved model
loaded_model = load_model(FullFileName)
# Prepare new data for testing (make sure it has the same columns as the original data)
new_data = p.DataFrame({
"Pclass": [3, 1],
"Sex": ["male", "female"],
"Age": [22, 38],
"SibSp": [1, 1],
"Parch": [0, 0],
"Fare": [7.25, 71.2833],
"Embarked": ["S", "C"]
})
# Make predictions using the loaded model
predictions = predict_model(loaded_model, data=new_data)
# Display the predictions
print(predictions)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

In this code, the application uses the stored model & then forecasts based on the optimized PyCaret model tuning.

Conclusion:

The above code demonstrates an end-to-end binary classification pipeline using the PyCaret library for the Titanic dataset. The goal is to predict whether a passenger survived based on the available features. Here are some conclusions you can draw from the code and data:

  1. Ease of use: The code showcases how PyCaret simplifies the machine learning process, from data preprocessing to model training, evaluation, and deployment. With just a few lines of code, you can perform tasks that would require much more effort using lower-level libraries.
  2. Model selection: The compare_models() function provides a quick and easy way to compare various machine learning algorithms and identify the best-performing one based on the chosen evaluation metric (accuracy by default). This selection helps you select a suitable model for the given problem.
  3. Hyper-parameter tuning: The tune_model() function automates the process of hyper-parameter tuning to improve model performance. We tuned a Random Forest model to optimize its predictive power in the example.
  4. Model evaluation: PyCaret provides several built-in visualization tools for assessing model performance. In the example, we used a confusion matrix and AUC-ROC curve to evaluate the performance of the tuned Random Forest model.
  5. Model deployment: The example demonstrates how to make predictions using the trained model and save the model for future use. This deployment showcases how PyCaret can streamline the process of deploying a machine-learning model in a production environment.

It is important to note that the conclusions drawn from the code and data are specific to the Titanic dataset and the chosen features. Adjust the feature engineering, preprocessing, and model selection steps for different datasets or problems accordingly. However, the general workflow and benefits provided by PyCaret would remain the same.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Handling unique data using the python-based FastDataMask Package package

Today, I’ll discuss one widespread use case of handling unique & critical data using a new python-based FastDataMask package. But before going through the details, why don’t we view the demo & then go through it?

Demo

Great! Let us understand in detail.

Architecture:

Let us understand the flow of events –

The application first invokes the FastDataMask python package, which accepts individual data in nature & then generates non-recoverable masked data, keeping the data pattern & nature in mind. Hence, anyone can still use the data for their analysis, whereas you can encapsulate the information from unauthorized pairs of eyes. Yet, they can get the essence & close data patterns to decide from any data analysis.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install FastDataMask==0.0.6
pip install imutils==0.5.3
pip install numpy==1.23.2
pip install pandas==1.4.3

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 15-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Masking PII Data!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'PII.csv',
'TITLE': "Masking PII Data!",
'PATH' : Curr_Path
}

Key entries from the above scripts are as follows –

'FILE_NAME': 'PII.csv',

This excel is a dummy input file, which looks like this –

In the above screenshot, our applications will use critical information like – First Name, Email, Address, Phone, Date Of Birth, SSN & Sal.

  • playPII.py (Main calling python script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created light data masking class. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
from FastDataMask import clsCircularList as ccl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
charList = ccl.clsCircularList()
CurrPath = cf.conf['SRC_PATH']
FileName = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
def mask_email(email):
try:
maskedEmail = charList.maskEmail(email)
return maskedEmail
except:
return ''
def mask_phone(phone):
try:
maskedPhone = charList.maskPhone(phone)
return maskedPhone
except:
return ''
def mask_name(flname):
try:
maskedFLName = charList.maskFLName(flname)
return maskedFLName
except:
return ''
def mask_date(dt):
try:
maskedDate = charList.maskDate(dt)
return maskedDate
except:
return ''
def mask_uniqueid(unqid):
try:
maskedUnqId = charList.maskSSN(unqid)
return maskedUnqId
except:
return ''
def mask_sal(sal):
try:
maskedSal = charList.maskSal(sal)
return maskedSal
except:
return ''
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
inputFile = CurrPath + FileName
print('Input File: ', inputFile)
df = p.read_csv(inputFile)
print('*'*120)
print('Source Data: ')
print(df)
print('*'*120)
hdr = list(df.columns.values)
print('Headers:', hdr)
df["MaskedFirstName"] = df["FirstName"].apply(mask_name)
df["MaskedEmail"] = df["Email"].apply(mask_email)
df["MaskedPhone"] = df["Phone"].apply(mask_phone)
df["MaskedDOB"] = df["DOB"].apply(mask_date)
df["MaskedSSN"] = df["SSN"].apply(mask_uniqueid)
df["MaskedSal"] = df["Sal"].apply(mask_sal)
# Dropping old columns
df.drop(['FirstName','Email','Phone','DOB','SSN', 'Sal'], axis=1, inplace=True)
# Renaming columns
df.rename(columns={'MaskedFirstName': 'FirstName'}, inplace=True)
df.rename(columns={'MaskedEmail': 'Email'}, inplace=True)
df.rename(columns={'MaskedPhone': 'Phone'}, inplace=True)
df.rename(columns={'MaskedDOB': 'DOB'}, inplace=True)
df.rename(columns={'MaskedSSN': 'SSN'}, inplace=True)
df.rename(columns={'MaskedSal': 'Sal'}, inplace=True)
# Repositioning columns of dataframe
df = df[hdr]
print('Masked DF: ')
print(df)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

playPII.py

hosted with ❤ by GitHub

Let us understand the key lines in details –

def mask_email(email):
    try:
        maskedEmail = charList.maskEmail(email)
        return maskedEmail
    except:
        return ''

def mask_phone(phone):
    try:
        maskedPhone = charList.maskPhone(phone)
        return maskedPhone
    except:
        return ''

def mask_name(flname):
    try:
        maskedFLName = charList.maskFLName(flname)
        return maskedFLName
    except:
        return ''

def mask_date(dt):
    try:
        maskedDate = charList.maskDate(dt)
        return maskedDate
    except:
        return ''

def mask_uniqueid(unqid):
    try:
        maskedUnqId = charList.maskSSN(unqid)
        return maskedUnqId
    except:
        return ''

def mask_sal(sal):
    try:
        maskedSal = charList.maskSal(sal)
        return maskedSal
    except:
        return ''

These functions take a value as input and attempt to mask it using the corresponding masking method from the charList module. If the masking is successful, the process will return a masked value per input; otherwise, the application will return an empty string.

More specifically, the functions are:

  • mask_email: masks the email address provided as input
  • mask_phone: masks the phone number provided as input
  • mask_name: masks the first and last name supplied as input
  • mask_date: masks the date provided as input
  • mask_uniqueid: masks the unique ID (e.g., Social Security Number) provided as input
  • mask_sal: masks the salary supplied as input

The functions use a try-except block to handle any exceptions that may arise when calling the corresponding masking method from the charList module. If the masking method raises an exception, the function will return an empty string to handle cases where the input value is invalid, or the masking method fails for another reason.

inputFile = CurrPath + FileName
df = p.read_csv(inputFile)
hdr = list(df.columns.values)


df["MaskedFirstName"] = df["FirstName"].apply(mask_name)
df["MaskedEmail"] = df["Email"].apply(mask_email)
df["MaskedPhone"] = df["Phone"].apply(mask_phone)
df["MaskedDOB"] = df["DOB"].apply(mask_date)
df["MaskedSSN"] = df["SSN"].apply(mask_uniqueid)
df["MaskedSal"] = df["Sal"].apply(mask_sal)

# Dropping old columns
df.drop(['FirstName','Email','Phone','DOB','SSN', 'Sal'], axis=1, inplace=True)

# Renaming columns
df.rename(columns={'MaskedFirstName': 'FirstName'}, inplace=True)
df.rename(columns={'MaskedEmail': 'Email'}, inplace=True)
df.rename(columns={'MaskedPhone': 'Phone'}, inplace=True)
df.rename(columns={'MaskedDOB': 'DOB'}, inplace=True)
df.rename(columns={'MaskedSSN': 'SSN'}, inplace=True)
df.rename(columns={'MaskedSal': 'Sal'}, inplace=True)
  1. The first line inputFile = CurrPath + FileName concatenates the current working directory path (CurrPath) with the name of a file (FileName) and assigns the resulting file path to a variable inputFile.
  2. The second line df = p.read_csv(inputFile) – reads the file located at inputFile into a Pandas DataFrame object called df.
  3. The following few lines apply certain functions (mask_name, mask_email, mask_phone, mask_date, mask_uniqueid, and mask_sal) to specific columns in the DataFrame to mask sensitive data. These functions likely perform some data masking or obfuscation on the input data.
  4. The following line df.drop([‘FirstName’,’Email’,’Phone’,’DOB’,’SSN’, ‘Sal’], axis=1, inplace=True) drops the original columns that we are supposed to mask (i.e., ‘FirstName’, ‘Email’, ‘Phone’, ‘DOB’, ‘SSN’, and ‘Sal’) from the DataFrame.
  5. The remaining lines rename the masked columns to their original names (i.e., ‘MaskedFirstName’ is renamed to ‘FirstName’, ‘MaskedEmail’ is renamed to ‘Email’, and so on).

Overall, this code reads in a file, masking specific sensitive columns, then outputting a new file with the masked data.

Now, let’s compare the output against the source data –

As you can see the blue highlighted columns are the masked column & you can compare the data pattern against the source.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Personal Virtual Assistant (SJ) implemented using python-based OpenAI, Rev_AI & PyTtSX3.

Today, I will discuss our Virtual personal assistant (SJ) with a combination of AI-driven APIs, which is now operational in Python. We will use the three most potent APIs using OpenAI, Rev-AI & Pyttsx3. Why don’t we see the demo first?

Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

Architecture:

Let us understand the flow of events –

The application first invokes the API to capture the audio spoken through the audio device & then translate that into text, which is later parsed & shared as input by the openai for the response of the posted queries. Once, OpenAI shares the response, the python-based engine will take the response & using pyttsx3 to convert them to voice.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install openai==0.25.0
pip install PyAudio==0.2.13
pip install playsound==1.3.0
pip install pandas==1.5.2
pip install rev-ai==2.17.1
pip install six==1.16.0
pip install websocket-client==0.59.0

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Dec-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'CODE_PATH': Curr_Path + sep + 'Code' + sep,
'APP_DESC_1': 'Personal Voice Assistant (SJ)!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'TITLE': "Personal Voice Assistant (SJ)!",
'PATH' : Curr_Path,
'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1
}

A few of the essential entries from the above snippet, which one should be looked for, are –

'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1

Note that, all the API-key are not real. You need to generate your own key.

  • clsText2Voice.py (The python script that will convert text to voice)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Oct-2019 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: Main class converting ####
#### text to voice using third-party API. ####
###############################################
import pyttsx3
from clsConfigClient import clsConfigClient as cf
class clsText2Voice:
def __init__(self):
self.speedSpeech = cf.conf['speedSpeech']
self.speedPitch = cf.conf['speedPitch']
def getAudio(self, srcString):
try:
speedSpeech = self.speedSpeech
speedPitch = self.speedPitch
engine = pyttsx3.init()
# Set the speed of the speech (in words per minute)
engine.setProperty('rate', speedSpeech)
# Set the pitch of the speech (1.0 is default)
engine.setProperty('pitch', speedPitch)
# Converting to MP3
engine.say(srcString)
engine.runAndWait()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the important snippet will be as follows –

def getAudio(self, srcString):
    try:
        speedSpeech = self.speedSpeech
        speedPitch = self.speedPitch
        
        engine = pyttsx3.init()

        # Set the speed of the speech (in words per minute)
        engine.setProperty('rate', speedSpeech)

        # Set the pitch of the speech (1.0 is default)
        engine.setProperty('pitch', speedPitch)

        # Converting to MP3
        engine.say(srcString)
        engine.runAndWait()

        return 0

The code is a function that generates speech audio from a given string using the Pyttsx3 library in Python. The function sets the speech rate and pitch using the “speedSpeech” and “speedPitch” properties of the calling object, initializes the Pyttsx3 engine, sets the speech rate and pitch on the engine, speaks the given string, and waits for the speech to finish. The function returns 0 after the speech is finished.


  • clsChatEngine.py (This python script will invoke the ChatGPT OpenAI class to initiate the response of the queries in python.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### ChatGPT OpenAI class to initiate the ####
#### response of the queries in python. ####
#####################################################
import os
import openai
import json
from clsConfigClient import clsConfigClient as cf
import sys
import errno
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
CODE_PATH=str(cf.conf['CODE_PATH'])
MODEL_NAME=str(cf.conf['MODEL_NAME'])
###############################################
### End of Global Section ###
###############################################
class clsChatEngine:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
def findFromSJ(self, text):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
# ChatGPT API_KEY
openai.api_key = OPENAI_API_KEY
print('22'*60)
try:
# Getting response from ChatGPT
response = openai.Completion.create(
engine=MODEL_NAME,
prompt=text,
max_tokens=64,
top_p=1.0,
n=3,
temperature=0,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=["\"\"\""]
)
except IOError as e:
if e.errno == errno.EPIPE:
pass
print('44'*60)
res = response.choices[0].text
return res
except IOError as e:
if e.errno == errno.EPIPE:
pass
except Exception as e:
x = str(e)
print(x)
print('66'*60)
return x

Key snippets from the above-script are as follows –

def findFromSJ(self, text):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY

          # ChatGPT API_KEY
          openai.api_key = OPENAI_API_KEY

          print('22'*60)

          try:
              # Getting response from ChatGPT
              response = openai.Completion.create(
              engine=MODEL_NAME,
              prompt=text,
              max_tokens=64,
              top_p=1.0,
              n=3,
              temperature=0,
              frequency_penalty=0.0,
              presence_penalty=0.0,
              stop=["\"\"\""]
              )
          except IOError as e:
              if e.errno == errno.EPIPE:
                  pass

          print('44'*60)
          res = response.choices[0].text

          return res

      except IOError as e:
          if e.errno == errno.EPIPE:
              pass

      except Exception as e:
          x = str(e)
          print(x)

          print('66'*60)

          return x

The code is a function that uses OpenAI’s ChatGPT model to generate text based on a given prompt text. The function takes the text to be completed as input and uses an API key stored in the OPENAI_API_KEY property of the calling object to request OpenAI’s API. If the request is successful, the function returns the top completion generated by the model, as stored in the text field of the first item in the choices list of the API response.

The function includes error handling for IOError and Exception. If an IOError occurs, the function checks if the error number is errno.EPIPE and, if it is, returns without doing anything. If an Exception occurs, the function converts the error message to a string and prints it, then returns the string.


  • clsVoice2Text.py (This python script will invoke the Rev-AI class to initiate the transformation of audio into the text.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### Rev-AI class to initiate the transformation ####
#### of audio into the text. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from rev_ai.streamingclient import RevAiStreamingClient
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Initiating Log class
l = cl.clsL()
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
################################################################
### Sampling rate of your microphone and desired chunk size ####
################################################################
class clsVoice2Text:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
self.rate = cf.conf['soundRate']
def processVoice(self, var):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
accessToken = cf.conf['REVAI_API_KEY']
rate = self.rate
chunk = int(rate/10)
################################################################
### Creates a media config with the settings set for a raw ####
### microphone input ####
################################################################
sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)
streamclient = RevAiStreamingClient(accessToken, sampleMC)
#####################################################################
### Opens microphone input. The input will stop after a keyboard ####
### interrupt. ####
#####################################################################
with ms.clsMicrophoneStream(rate, chunk) as stream:
#####################################################################
### Uses try method to enable users to manually close the stream ####
#####################################################################
try:
response_gen = ''
response = ''
finalText = ''
#########################################################################
### Starts the server connection and thread sending microphone audio ####
#########################################################################
response_gen = streamclient.start(stream.generator())
###################################################
### Iterates through responses and prints them ####
###################################################
for response in response_gen:
try:
print('JSON:')
print(response)
r = json.loads(response)
df = p.json_normalize(r["elements"])
l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
column_name = "confidence"
if column_name in df.columns:
print('DF:: ')
print(df)
finalText = "".join(df["value"])
print("TEXT:")
print(finalText)
df = p.DataFrame()
raise Exception
except Exception as e:
x = str(e)
break
streamclient.end()
return finalText
except Exception as e:
x = str(e)
#######################################
### Ends the WebSocket connection. ####
#######################################
streamclient.end()
return ''
except Exception as e:
x = str(e)
print('Error: ', x)
streamclient.end()
return x

Here is the important snippet from the above code –

def processVoice(self, var):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY
          accessToken = cf.conf['REVAI_API_KEY']
          rate = self.rate
          chunk = int(rate/10)

          ################################################################
          ### Creates a media config with the settings set for a raw  ####
          ### microphone input                                        ####
          ################################################################

          sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)

          streamclient = RevAiStreamingClient(accessToken, sampleMC)

          #####################################################################
          ### Opens microphone input. The input will stop after a keyboard ####
          ### interrupt.                                                   ####
          #####################################################################

          with ms.clsMicrophoneStream(rate, chunk) as stream:

              #####################################################################
              ### Uses try method to enable users to manually close the stream ####
              #####################################################################

              try:
                  response_gen = ''
                  response = ''
                  finalText = ''
                  
                  ############################################
                  ### Starts the server connection        ####
                  ### and thread sending microphone audio #### 
                  ############################################

                  response_gen = streamclient.start(stream.generator())

                  ###################################################
                  ### Iterates through responses and prints them ####
                  ###################################################

                  for response in response_gen:
                      try:
                          print('JSON:')
                          print(response)

                          r = json.loads(response)

                          df = p.json_normalize(r["elements"])
                          l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
                          column_name = "confidence"

                          if column_name in df.columns:
                              print('DF:: ')
                              print(df)

                              finalText = "".join(df["value"])
                              print("TEXT:")
                              print(finalText)

                              df = p.DataFrame()

                              raise Exception

                      except Exception as e:
                          x = str(e)
                          break

                  streamclient.end()

                  return finalText

              except Exception as e:
                  x = str(e)
                  #######################################
                  ### Ends the WebSocket connection. ####
                  #######################################

                  streamclient.end()

                  return ''

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          streamclient.end()

          return x

The code is a python function called processVoice() that processes a user’s voice input using the Rev.AI API. The function takes in one argument, “var,” which is not used in the code.

  1. Let us understand the code –
    • First, the function sets several variables, including the Rev.AI API access token, the sample rate, and the chunk size for the audio input.
    • Then, it creates a media configuration object for raw microphone input.
    • A RevAiStreamingClient object is created using the access token and the media configuration.
    • The code opens the microphone input using a statement and the microphone stream class.
    • Within the statement, the code starts the server connection and a thread that sends microphone audio to the server.
    • The code then iterates through the responses from the server, normalizing the JSON response and storing the values in a pandas data-frame.
    • If the “confidence” column exists in the data-frame, the code joins all the values to form the final text and raises an exception.
      • If there is an exception, the WebSocket connection is ended, and the final text is returned.
      • If there is any error, the WebSocket connection is also ended, and an empty string or the error message is returned.

  • clsMicrophoneStream.py (This python script invoke the rev_ai template to capture the chunk voice data & stream it to the service for text translation & return the response to app.)


#####################################################
#### Modified By: SATYAKI DE ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### rev_ai template to capture the chunk voice ####
#### data & stream it to the service for text ####
#### translation & return the response to app. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from six.moves import queue
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
class clsMicrophoneStream(object):
#############################################
### Opens a recording stream as a ####
### generator yielding the audio chunks. ####
#############################################
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
##################################################
### Create a thread-safe buffer of audio data ####
##################################################
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
#########################################################
### The API currently only supports 1-channel (mono) ####
### audio. ####
#########################################################
channels=1, rate=self._rate,
input=True, frames_per_buffer=self._chunk,
####################################################################
### Run the audio stream asynchronously to fill the buffer ####
### object. Run the audio stream asynchronously to fill the ####
### buffer object. This is necessary so that the input device's ####
### buffer doesn't overflow while the calling thread makes ####
### network requests, etc. ####
####################################################################
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
###############################################################
### Signal the generator to terminate so that the client's ####
### streaming_recognize method will not block the process ####
### termination. ####
###############################################################
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
##############################################################
### Continuously collect data from the audio stream, into ####
### the buffer. ####
##############################################################
self._buff.put(in_data)
return None, pyaudio.paContinue