RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Hacking the performance of Python Solutions with a custom-built library

Today, I’m very excited to demonstrate an effortless & new way to hack the performance of Python. This post will be a super short & yet crisp presentation of improving the overall performance.

Why not view the demo before going through it?


Demo

Isn’t it exciting? Let’s understand the steps to improve your code.

pip install cython

Cython is a Python-to-C compiler. It can significantly improve performance for specific tasks, especially those with heavy computation and loops. Also, Cython’s syntax is very similar to Python, which makes it easy to learn.

Let’s consider an example where we calculate the sum of squares for a list of numbers. The code without optimization would look like this:

  • perfTest_1.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### first version of accute computation.            ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

def compute_sum_of_squares(n):
    return sum([i**2 for i in range(n)])

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 1: Execution time: {time.time() - start} seconds")

Here, n_val contains the value as – “1000000000”.

Now, let’s optimize it using Cython by installing the abovementioned packages. Then, you will have to create a .pyx file, say “compute.pyx”, with the following code:

cpdef double compute_sum_of_squares(int n):
    return sum([i**2 for i in range(n)])

Now, create a setup.py file to compile it:

###########################################################
#### Written By: SATYAKI DE                            ####
#### Written On: 31-Jul-2023                           ####
#### Modified On 31-Jul-2023                           ####
####                                                   ####
#### Objective: This is the main calling               ####
#### python script that will create the                ####
#### compiled library after executing the compute.pyx. ####
####                                                   ####
###########################################################

from setuptools import setup
from Cython.Build import cythonize

setup(
    ext_modules = cythonize("compute.pyx")
)

Compile it using the command:

python setup.py build_ext --inplace

This will look like the following –

Finally, you can import the function from the compiled “.pyx” file inside the improved code.

  • perfTest_2.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### optimized & precompiled custom library, which   ####
#### will significantly improve the performance.     ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf
from compute import compute_sum_of_squares

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 2: Execution time with multiprocessing: {time.time() - start} seconds")

By compiling to C, Cython can speed up loop and function calls, leading to significant speedup for CPU-bound tasks.

Please note that while Cython can dramatically improve performance, it can make the code more complex and harder to debug. Therefore, starting with regular Python and switching to Cython for the performance-critical parts of the code is recommended.


So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Demonstration of GPT-3 model tuning using Python for an upcoming PyPi-package

Today, I’m very excited to demonstrate an effortless & new way to fine-tune the GPT-3 model using Python with the help of my new build (unpublished) PyPi package. In this post, I plan to deal with the custom website link as a response from this website depending upon the user queries with the help of the OpenAI-based tuned model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it exciting? Finally, we can efficiently handle your custom website URL using OpenAI tuned model.


What is ChatGPT?

ChatGPT is an advanced artificial intelligence language model developed by OpenAI based on the GPT-4 architecture. As an AI model, it is designed to understand and generate human-like text-based on the input it receives. ChatGPT can engage in various tasks, such as answering questions, providing recommendations, creating content, and simulating conversation. While it is highly advanced and versatile, it’s important to note that ChatGPT’s knowledge is limited to the data it was trained on, with a cutoff date of September 2021.

When to tune GPT model?

Tuning a GPT or any AI model might be necessary for various reasons. Here are some common scenarios when you should consider adjusting or fine-tuning a GPT model:

  1. Domain-specific knowledge: If you need your model to have a deeper understanding of a specific domain or industry, you can fine-tune it with domain-specific data to improve its performance.
  2. New or updated data: If new or updated information is not part of the original training data, you should fine-tune the model to ensure it has the most accurate and up-to-date knowledge.
  3. Customization: If you require the model to have a specific style, tone, or focus, you can fine-tune it with data that reflects those characteristics.
  4. Ethical or safety considerations: To make the model safer and more aligned with human values, you should fine-tune it to reduce biased or harmful outputs.
  5. Improve performance: If the base model’s performance is unsatisfactory for a particular task or application, you can fine-tune it on a dataset more relevant to the job, often leading to better results.

Remember that tuning or fine-tuning a GPT model requires access to appropriate data and computational resources and an understanding of the model’s architecture and training techniques. Additionally, monitoring and evaluating the model’s performance after fine-tuning is essential to ensure that the desired improvements have been achieved.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

The initial Python-based client interacts with the tuned OpenAI models. This process enables it to get a precise response with custom data in a very convenient way. So that anyone can understand.


SOURCE DATA:

Let us understand how to feed the source data as it will deal with your website URL link.

The first data that we are going to talk about is the one that contains the hyperlink. Let us explore the sample here.

From the above diagram, one can easily understand that the application will interpret a unique hash number associated with a specific URL. This data will be used to look up the URL after the OpenAI response from the tuned model as a result of any user query.

Now, let us understand the actual source data.

If we closely check, we’ll see the source file contains two columns – prompt & completion. And the website reference is put inside the curly braces as shown – “{Hash Code that represents your URL}.”

During the response, the newly created library replaces the hash value with the correct URL after the successful lookup & presents the complete answer.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 21-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### OpenAI fine-tune projects. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'ChatGPT Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'TEMP_FILE_NAME': 'chatGPTData.jsonl',
'TITLE': "GPT-3 Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/&#39;,
'EPOCH': 10,
'SUFFIX': 'py-saty',
'EXIT_KEYWORD': 'bye'
}

Some of the important entries that will require later are as follows –

'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/',
'EXIT_KEYWORD': 'bye'

We’ll discuss these entries later.

  • trainChatGPTModel.py (This is the main calling Python script that will invoke the newly created fine-tune GPT-3 enabler.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune GPT-3 enabler. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
#tModel = tm.clsTrainModel()
tModel = tm.clsTrainModel3()
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
#clog.logr(OutPutFileName, debug_ind, df, subdir)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Following are the key snippet from the above script –

data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']

And, then –

tModel = tm.clsTrainModel3()
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)

As one can see, the package needs only the source data file to fine-tune GPT-3 model.

  • checkFineTuneChatGPTModelStat.py (This is the main Python script that will check the status of the tuned process that will happen inside the OpenAI-cloud environment.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune job status inside ####
#### the OpenAI environment. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
# Example usage
input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)
r1 = tmodel.checkStat(url, open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

To check the status of the fine-tuned job inside the OpenAI environment, one needs to provide the fine tune id, which generally starts with -> “ft-*.” One would get this value after the train script’s successful run.

Some of the other key snippets are –

tmodel = tm.clsTestModel3()

url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']

And, then –

input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)

r1 = tmodel.checkStat(url, open_api_key)

The above snippet is self-explanatory as one is passing the fine tune id along with the OpenAI API key.

  • testChatGPTModel.py (This is the main testing Python script that will invoke the newly created fine-tune GPT-3 enabler to get a response with custom data.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 19-Apr-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created class that will test the ####
#### tuned model output. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import pandas as p
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)
if r1 == 0:
print('Successfully tested the tuned GPT-3 model.')
else:
print('Failed to test the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key entries from the above snippet are as follows –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']

And, then –

LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)

In the above lines, the application gets the correct URL value from the look file we’ve prepared for this specific use case.

  • deleteChatGPTModel.py (This is the main Python script that will delete the old intended tuned model, which is no longer needed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 21-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created delete model methods for ####
#### OpenAI. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
r1 = tmodel.delOldModel(open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key snippets from the above scripts are –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']

And, then –

r1 = tmodel.delOldModel(open_api_key)

We’ve demonstrated that using a straightforward method, one can delete any old tuned model from OpenAI that is no longer required.

KEY FEATURES TO CONSIDER DURING TUNING:

  • Data quality: Ensure that the data used for fine-tuning is clean, relevant, and representative of the domain you want the model to understand. Check for biases, inconsistencies, and errors in the dataset.
  • Overfitting: Be cautious of overfitting, which occurs when the model performs exceptionally well on the training data but poorly on unseen data. You can address overfitting by using regularization techniques, early stopping, or cross-validation.
  • Model size and resource requirements: GPT models can be resource-intensive. Be mindful of the hardware limitations and computational resources available when selecting the model size and the time and cost associated with training.
  • Hyperparameter tuning: Select appropriate hyperparameters for your fine-tuning processes, such as learning rate, batch size, and the number of epochs. Experiment with different combinations to achieve the best results without overfitting.
  • Evaluation metrics: Choose suitable evaluation metrics to assess the performance of your fine-tuned model. Consider using multiple metrics to understand your model’s performance comprehensively.
  • Ethical considerations: Be aware of potential biases in your dataset and how the model’s predictions might impact users. Address ethical concerns during the fine-tuning process and consider using techniques such as data augmentation or adversarial training to mitigate these biases.
  • Monitoring and maintenance: Continuously monitor the model’s performance after deployment, and be prepared to re-tune or update it as needed. Regular maintenance ensures that the model remains relevant and accurate.
  • Documentation: Document your tuning process, including the data used, model architecture, hyperparameters, and evaluation metrics. This factor will facilitate easier collaboration, replication, and model maintenance.
  • Cost: OpenAI fine-tuning can be extremely expensive, even for a small volume of data. Hence, organization-wise, one needs to be extremely careful while using this feature.

COST FACTOR:

Before we discuss the actual spending, let us understand the tested data volume to train & tune the model.

So, we’re talking about a total size of 500 KB (at max). And, we did 10 epochs during the training as you can see from the config file mentioned above.

So, it is pretty expensive. Use it wisely.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Tuning your model using the python-based low-code machine-learning library PyCaret

Today, I’ll discuss another important topic before I will share the excellent use case next month, as I still need some time to finish that one. We’ll see how we can leverage the brilliant capability of a low-code machine-learning library named PyCaret.

But before going through the details, why don’t we view the demo & then go through it?

Demo

Architecture:

Let us understand the flow of events –

As one can see, the initial training requests are triggered from the PyCaret-driven training models. And the application can successfully process & identify the best models out of the other combinations.

Python Packages:

Following are the python packages that are necessary to develop this use case –

pip install pandas
pip install pycaret

PyCaret is dependent on a combination of other popular python packages. So, you need to install them successfully to run this package.

CODE:

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Mar-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'PyCaret Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Titanic.csv',
'MODEL_NAME': 'PyCaret-ft-personal-2023-03-31-04-29-53',
'TITLE': "PyCaret Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data'
}

I’m skipping this section as it is self-explanatory.


  • clsTrainModel.py (This is the main class that contains the core logic of low-code machine-learning library to evaluate the best model for your solutions.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main class that ####
#### contains the core logic of low-code ####
#### machine-learning library to evaluate the ####
#### best model for your solutions. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Import necessary libraries
import pandas as p
from pycaret.classification import *
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
###############################################
### End of Global Section ###
###############################################
class clsTrainModel:
def __init__(self):
self.model_path = cf.conf['MODEL_PATH']
self.model_name = cf.conf['MODEL_NAME']
def trainModel(self, FullFileName):
try:
df = p.read_csv(FullFileName)
row_count = int(df.shape[0])
print('Number of rows: ', str(row_count))
print(df)
# Initialize the setup in PyCaret
clf_setup = setup(
data=df,
target="Survived",
train_size=0.8, # 80% for training, 20% for testing
categorical_features=["Sex", "Embarked"],
ordinal_features={"Pclass": ["1", "2", "3"]},
ignore_features=["Name", "Ticket", "Cabin", "PassengerId"],
#silent=True, # Set to False for interactive setup
)
# Compare various models
best_model = compare_models()
# Create a specific model (e.g., Random Forest)
rf_model = create_model("rf")
# Hyperparameter tuning
tuned_rf_model = tune_model(rf_model)
# Evaluate model performance
plot_model(tuned_rf_model, plot="confusion_matrix")
plot_model(tuned_rf_model, plot="auc")
# Finalize the model (train on the complete dataset)
final_rf_model = finalize_model(tuned_rf_model)
# Make predictions on new data
new_data = df.drop("Survived", axis=1)
predictions = predict_model(final_rf_model, data=new_data)
# Writing into the Model
FullModelName = self.model_path + self.model_name
print('Model Output @:: ', str(FullModelName))
print()
# Save the fine-tuned model
save_model(final_rf_model, FullModelName)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand the code in simple terms –

  1. Import necessary libraries and load the Titanic dataset.
  2. Initialize the PyCaret setup, specifying the target variable, train-test split, categorical and ordinal features, and features to ignore.
  3. Compare various models to find the best-performing one.
  4. Create a specific model (Random Forest in this case).
  5. Perform hyper-parameter tuning on the Random Forest model.
  6. Evaluate the model’s performance using a confusion matrix and AUC-ROC curve.
  7. Finalize the model by training it on the complete dataset.
  8. Make predictions on new data.
  9. Save the trained model for future use.

  • trainPYCARETModel.py (This is the main calling python script that will invoke the training class of PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### training class of Pycaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
tModel = tm.clsTrainModel()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

The above code is pretty self-explanatory as well.


  • testPYCARETModel.py (This is the main calling python script that will invoke the testing script for PyCaret package.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 31-Mar-2023 ####
#### Modified On 31-Mar-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### testing script for PyCaret package. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
from pycaret.classification import load_model, predict_model
import pandas as p
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
model_path = cf.conf['MODEL_PATH']
model_name = cf.conf['MODEL_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = model_path + model_name
# Load the saved model
loaded_model = load_model(FullFileName)
# Prepare new data for testing (make sure it has the same columns as the original data)
new_data = p.DataFrame({
"Pclass": [3, 1],
"Sex": ["male", "female"],
"Age": [22, 38],
"SibSp": [1, 1],
"Parch": [0, 0],
"Fare": [7.25, 71.2833],
"Embarked": ["S", "C"]
})
# Make predictions using the loaded model
predictions = predict_model(loaded_model, data=new_data)
# Display the predictions
print(predictions)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

In this code, the application uses the stored model & then forecasts based on the optimized PyCaret model tuning.

Conclusion:

The above code demonstrates an end-to-end binary classification pipeline using the PyCaret library for the Titanic dataset. The goal is to predict whether a passenger survived based on the available features. Here are some conclusions you can draw from the code and data:

  1. Ease of use: The code showcases how PyCaret simplifies the machine learning process, from data preprocessing to model training, evaluation, and deployment. With just a few lines of code, you can perform tasks that would require much more effort using lower-level libraries.
  2. Model selection: The compare_models() function provides a quick and easy way to compare various machine learning algorithms and identify the best-performing one based on the chosen evaluation metric (accuracy by default). This selection helps you select a suitable model for the given problem.
  3. Hyper-parameter tuning: The tune_model() function automates the process of hyper-parameter tuning to improve model performance. We tuned a Random Forest model to optimize its predictive power in the example.
  4. Model evaluation: PyCaret provides several built-in visualization tools for assessing model performance. In the example, we used a confusion matrix and AUC-ROC curve to evaluate the performance of the tuned Random Forest model.
  5. Model deployment: The example demonstrates how to make predictions using the trained model and save the model for future use. This deployment showcases how PyCaret can streamline the process of deploying a machine-learning model in a production environment.

It is important to note that the conclusions drawn from the code and data are specific to the Titanic dataset and the chosen features. Adjust the feature engineering, preprocessing, and model selection steps for different datasets or problems accordingly. However, the general workflow and benefits provided by PyCaret would remain the same.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Handling unique data using the python-based FastDataMask Package package

Today, I’ll discuss one widespread use case of handling unique & critical data using a new python-based FastDataMask package. But before going through the details, why don’t we view the demo & then go through it?

Demo

Great! Let us understand in detail.

Architecture:

Let us understand the flow of events –

The application first invokes the FastDataMask python package, which accepts individual data in nature & then generates non-recoverable masked data, keeping the data pattern & nature in mind. Hence, anyone can still use the data for their analysis, whereas you can encapsulate the information from unauthorized pairs of eyes. Yet, they can get the essence & close data patterns to decide from any data analysis.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install FastDataMask==0.0.6
pip install imutils==0.5.3
pip install numpy==1.23.2
pip install pandas==1.4.3

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 15-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Masking PII Data!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'PII.csv',
'TITLE': "Masking PII Data!",
'PATH' : Curr_Path
}

Key entries from the above scripts are as follows –

'FILE_NAME': 'PII.csv',

This excel is a dummy input file, which looks like this –

In the above screenshot, our applications will use critical information like – First Name, Email, Address, Phone, Date Of Birth, SSN & Sal.

  • playPII.py (Main calling python script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created light data masking class. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
from FastDataMask import clsCircularList as ccl
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
charList = ccl.clsCircularList()
CurrPath = cf.conf['SRC_PATH']
FileName = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
def mask_email(email):
try:
maskedEmail = charList.maskEmail(email)
return maskedEmail
except:
return ''
def mask_phone(phone):
try:
maskedPhone = charList.maskPhone(phone)
return maskedPhone
except:
return ''
def mask_name(flname):
try:
maskedFLName = charList.maskFLName(flname)
return maskedFLName
except:
return ''
def mask_date(dt):
try:
maskedDate = charList.maskDate(dt)
return maskedDate
except:
return ''
def mask_uniqueid(unqid):
try:
maskedUnqId = charList.maskSSN(unqid)
return maskedUnqId
except:
return ''
def mask_sal(sal):
try:
maskedSal = charList.maskSal(sal)
return maskedSal
except:
return ''
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
inputFile = CurrPath + FileName
print('Input File: ', inputFile)
df = p.read_csv(inputFile)
print('*'*120)
print('Source Data: ')
print(df)
print('*'*120)
hdr = list(df.columns.values)
print('Headers:', hdr)
df["MaskedFirstName"] = df["FirstName"].apply(mask_name)
df["MaskedEmail"] = df["Email"].apply(mask_email)
df["MaskedPhone"] = df["Phone"].apply(mask_phone)
df["MaskedDOB"] = df["DOB"].apply(mask_date)
df["MaskedSSN"] = df["SSN"].apply(mask_uniqueid)
df["MaskedSal"] = df["Sal"].apply(mask_sal)
# Dropping old columns
df.drop(['FirstName','Email','Phone','DOB','SSN', 'Sal'], axis=1, inplace=True)
# Renaming columns
df.rename(columns={'MaskedFirstName': 'FirstName'}, inplace=True)
df.rename(columns={'MaskedEmail': 'Email'}, inplace=True)
df.rename(columns={'MaskedPhone': 'Phone'}, inplace=True)
df.rename(columns={'MaskedDOB': 'DOB'}, inplace=True)
df.rename(columns={'MaskedSSN': 'SSN'}, inplace=True)
df.rename(columns={'MaskedSal': 'Sal'}, inplace=True)
# Repositioning columns of dataframe
df = df[hdr]
print('Masked DF: ')
print(df)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

playPII.py

hosted with ❤ by GitHub

Let us understand the key lines in details –

def mask_email(email):
    try:
        maskedEmail = charList.maskEmail(email)
        return maskedEmail
    except:
        return ''

def mask_phone(phone):
    try:
        maskedPhone = charList.maskPhone(phone)
        return maskedPhone
    except:
        return ''

def mask_name(flname):
    try:
        maskedFLName = charList.maskFLName(flname)
        return maskedFLName
    except:
        return ''

def mask_date(dt):
    try:
        maskedDate = charList.maskDate(dt)
        return maskedDate
    except:
        return ''

def mask_uniqueid(unqid):
    try:
        maskedUnqId = charList.maskSSN(unqid)
        return maskedUnqId
    except:
        return ''

def mask_sal(sal):
    try:
        maskedSal = charList.maskSal(sal)
        return maskedSal
    except:
        return ''

These functions take a value as input and attempt to mask it using the corresponding masking method from the charList module. If the masking is successful, the process will return a masked value per input; otherwise, the application will return an empty string.

More specifically, the functions are:

  • mask_email: masks the email address provided as input
  • mask_phone: masks the phone number provided as input
  • mask_name: masks the first and last name supplied as input
  • mask_date: masks the date provided as input
  • mask_uniqueid: masks the unique ID (e.g., Social Security Number) provided as input
  • mask_sal: masks the salary supplied as input

The functions use a try-except block to handle any exceptions that may arise when calling the corresponding masking method from the charList module. If the masking method raises an exception, the function will return an empty string to handle cases where the input value is invalid, or the masking method fails for another reason.

inputFile = CurrPath + FileName
df = p.read_csv(inputFile)
hdr = list(df.columns.values)


df["MaskedFirstName"] = df["FirstName"].apply(mask_name)
df["MaskedEmail"] = df["Email"].apply(mask_email)
df["MaskedPhone"] = df["Phone"].apply(mask_phone)
df["MaskedDOB"] = df["DOB"].apply(mask_date)
df["MaskedSSN"] = df["SSN"].apply(mask_uniqueid)
df["MaskedSal"] = df["Sal"].apply(mask_sal)

# Dropping old columns
df.drop(['FirstName','Email','Phone','DOB','SSN', 'Sal'], axis=1, inplace=True)

# Renaming columns
df.rename(columns={'MaskedFirstName': 'FirstName'}, inplace=True)
df.rename(columns={'MaskedEmail': 'Email'}, inplace=True)
df.rename(columns={'MaskedPhone': 'Phone'}, inplace=True)
df.rename(columns={'MaskedDOB': 'DOB'}, inplace=True)
df.rename(columns={'MaskedSSN': 'SSN'}, inplace=True)
df.rename(columns={'MaskedSal': 'Sal'}, inplace=True)
  1. The first line inputFile = CurrPath + FileName concatenates the current working directory path (CurrPath) with the name of a file (FileName) and assigns the resulting file path to a variable inputFile.
  2. The second line df = p.read_csv(inputFile) – reads the file located at inputFile into a Pandas DataFrame object called df.
  3. The following few lines apply certain functions (mask_name, mask_email, mask_phone, mask_date, mask_uniqueid, and mask_sal) to specific columns in the DataFrame to mask sensitive data. These functions likely perform some data masking or obfuscation on the input data.
  4. The following line df.drop([‘FirstName’,’Email’,’Phone’,’DOB’,’SSN’, ‘Sal’], axis=1, inplace=True) drops the original columns that we are supposed to mask (i.e., ‘FirstName’, ‘Email’, ‘Phone’, ‘DOB’, ‘SSN’, and ‘Sal’) from the DataFrame.
  5. The remaining lines rename the masked columns to their original names (i.e., ‘MaskedFirstName’ is renamed to ‘FirstName’, ‘MaskedEmail’ is renamed to ‘Email’, and so on).

Overall, this code reads in a file, masking specific sensitive columns, then outputting a new file with the masked data.

Now, let’s compare the output against the source data –

As you can see the blue highlighted columns are the masked column & you can compare the data pattern against the source.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Personal Virtual Assistant (SJ) implemented using python-based OpenAI, Rev_AI & PyTtSX3.

Today, I will discuss our Virtual personal assistant (SJ) with a combination of AI-driven APIs, which is now operational in Python. We will use the three most potent APIs using OpenAI, Rev-AI & Pyttsx3. Why don’t we see the demo first?

Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

Architecture:

Let us understand the flow of events –

The application first invokes the API to capture the audio spoken through the audio device & then translate that into text, which is later parsed & shared as input by the openai for the response of the posted queries. Once, OpenAI shares the response, the python-based engine will take the response & using pyttsx3 to convert them to voice.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install openai==0.25.0
pip install PyAudio==0.2.13
pip install playsound==1.3.0
pip install pandas==1.5.2
pip install rev-ai==2.17.1
pip install six==1.16.0
pip install websocket-client==0.59.0

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Dec-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'CODE_PATH': Curr_Path + sep + 'Code' + sep,
'APP_DESC_1': 'Personal Voice Assistant (SJ)!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'TITLE': "Personal Voice Assistant (SJ)!",
'PATH' : Curr_Path,
'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1
}

A few of the essential entries from the above snippet, which one should be looked for, are –

'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1

Note that, all the API-key are not real. You need to generate your own key.

  • clsText2Voice.py (The python script that will convert text to voice)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Oct-2019 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: Main class converting ####
#### text to voice using third-party API. ####
###############################################
import pyttsx3
from clsConfigClient import clsConfigClient as cf
class clsText2Voice:
def __init__(self):
self.speedSpeech = cf.conf['speedSpeech']
self.speedPitch = cf.conf['speedPitch']
def getAudio(self, srcString):
try:
speedSpeech = self.speedSpeech
speedPitch = self.speedPitch
engine = pyttsx3.init()
# Set the speed of the speech (in words per minute)
engine.setProperty('rate', speedSpeech)
# Set the pitch of the speech (1.0 is default)
engine.setProperty('pitch', speedPitch)
# Converting to MP3
engine.say(srcString)
engine.runAndWait()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the important snippet will be as follows –

def getAudio(self, srcString):
    try:
        speedSpeech = self.speedSpeech
        speedPitch = self.speedPitch
        
        engine = pyttsx3.init()

        # Set the speed of the speech (in words per minute)
        engine.setProperty('rate', speedSpeech)

        # Set the pitch of the speech (1.0 is default)
        engine.setProperty('pitch', speedPitch)

        # Converting to MP3
        engine.say(srcString)
        engine.runAndWait()

        return 0

The code is a function that generates speech audio from a given string using the Pyttsx3 library in Python. The function sets the speech rate and pitch using the “speedSpeech” and “speedPitch” properties of the calling object, initializes the Pyttsx3 engine, sets the speech rate and pitch on the engine, speaks the given string, and waits for the speech to finish. The function returns 0 after the speech is finished.


  • clsChatEngine.py (This python script will invoke the ChatGPT OpenAI class to initiate the response of the queries in python.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### ChatGPT OpenAI class to initiate the ####
#### response of the queries in python. ####
#####################################################
import os
import openai
import json
from clsConfigClient import clsConfigClient as cf
import sys
import errno
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
CODE_PATH=str(cf.conf['CODE_PATH'])
MODEL_NAME=str(cf.conf['MODEL_NAME'])
###############################################
### End of Global Section ###
###############################################
class clsChatEngine:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
def findFromSJ(self, text):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
# ChatGPT API_KEY
openai.api_key = OPENAI_API_KEY
print('22'*60)
try:
# Getting response from ChatGPT
response = openai.Completion.create(
engine=MODEL_NAME,
prompt=text,
max_tokens=64,
top_p=1.0,
n=3,
temperature=0,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=["\"\"\""]
)
except IOError as e:
if e.errno == errno.EPIPE:
pass
print('44'*60)
res = response.choices[0].text
return res
except IOError as e:
if e.errno == errno.EPIPE:
pass
except Exception as e:
x = str(e)
print(x)
print('66'*60)
return x

Key snippets from the above-script are as follows –

def findFromSJ(self, text):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY

          # ChatGPT API_KEY
          openai.api_key = OPENAI_API_KEY

          print('22'*60)

          try:
              # Getting response from ChatGPT
              response = openai.Completion.create(
              engine=MODEL_NAME,
              prompt=text,
              max_tokens=64,
              top_p=1.0,
              n=3,
              temperature=0,
              frequency_penalty=0.0,
              presence_penalty=0.0,
              stop=["\"\"\""]
              )
          except IOError as e:
              if e.errno == errno.EPIPE:
                  pass

          print('44'*60)
          res = response.choices[0].text

          return res

      except IOError as e:
          if e.errno == errno.EPIPE:
              pass

      except Exception as e:
          x = str(e)
          print(x)

          print('66'*60)

          return x

The code is a function that uses OpenAI’s ChatGPT model to generate text based on a given prompt text. The function takes the text to be completed as input and uses an API key stored in the OPENAI_API_KEY property of the calling object to request OpenAI’s API. If the request is successful, the function returns the top completion generated by the model, as stored in the text field of the first item in the choices list of the API response.

The function includes error handling for IOError and Exception. If an IOError occurs, the function checks if the error number is errno.EPIPE and, if it is, returns without doing anything. If an Exception occurs, the function converts the error message to a string and prints it, then returns the string.


  • clsVoice2Text.py (This python script will invoke the Rev-AI class to initiate the transformation of audio into the text.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### Rev-AI class to initiate the transformation ####
#### of audio into the text. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from rev_ai.streamingclient import RevAiStreamingClient
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Initiating Log class
l = cl.clsL()
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
################################################################
### Sampling rate of your microphone and desired chunk size ####
################################################################
class clsVoice2Text:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
self.rate = cf.conf['soundRate']
def processVoice(self, var):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
accessToken = cf.conf['REVAI_API_KEY']
rate = self.rate
chunk = int(rate/10)
################################################################
### Creates a media config with the settings set for a raw ####
### microphone input ####
################################################################
sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)
streamclient = RevAiStreamingClient(accessToken, sampleMC)
#####################################################################
### Opens microphone input. The input will stop after a keyboard ####
### interrupt. ####
#####################################################################
with ms.clsMicrophoneStream(rate, chunk) as stream:
#####################################################################
### Uses try method to enable users to manually close the stream ####
#####################################################################
try:
response_gen = ''
response = ''
finalText = ''
#########################################################################
### Starts the server connection and thread sending microphone audio ####
#########################################################################
response_gen = streamclient.start(stream.generator())
###################################################
### Iterates through responses and prints them ####
###################################################
for response in response_gen:
try:
print('JSON:')
print(response)
r = json.loads(response)
df = p.json_normalize(r["elements"])
l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
column_name = "confidence"
if column_name in df.columns:
print('DF:: ')
print(df)
finalText = "".join(df["value"])
print("TEXT:")
print(finalText)
df = p.DataFrame()
raise Exception
except Exception as e:
x = str(e)
break
streamclient.end()
return finalText
except Exception as e:
x = str(e)
#######################################
### Ends the WebSocket connection. ####
#######################################
streamclient.end()
return ''
except Exception as e:
x = str(e)
print('Error: ', x)
streamclient.end()
return x

Here is the important snippet from the above code –

def processVoice(self, var):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY
          accessToken = cf.conf['REVAI_API_KEY']
          rate = self.rate
          chunk = int(rate/10)

          ################################################################
          ### Creates a media config with the settings set for a raw  ####
          ### microphone input                                        ####
          ################################################################

          sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)

          streamclient = RevAiStreamingClient(accessToken, sampleMC)

          #####################################################################
          ### Opens microphone input. The input will stop after a keyboard ####
          ### interrupt.                                                   ####
          #####################################################################

          with ms.clsMicrophoneStream(rate, chunk) as stream:

              #####################################################################
              ### Uses try method to enable users to manually close the stream ####
              #####################################################################

              try:
                  response_gen = ''
                  response = ''
                  finalText = ''
                  
                  ############################################
                  ### Starts the server connection        ####
                  ### and thread sending microphone audio #### 
                  ############################################

                  response_gen = streamclient.start(stream.generator())

                  ###################################################
                  ### Iterates through responses and prints them ####
                  ###################################################

                  for response in response_gen:
                      try:
                          print('JSON:')
                          print(response)

                          r = json.loads(response)

                          df = p.json_normalize(r["elements"])
                          l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
                          column_name = "confidence"

                          if column_name in df.columns:
                              print('DF:: ')
                              print(df)

                              finalText = "".join(df["value"])
                              print("TEXT:")
                              print(finalText)

                              df = p.DataFrame()

                              raise Exception

                      except Exception as e:
                          x = str(e)
                          break

                  streamclient.end()

                  return finalText

              except Exception as e:
                  x = str(e)
                  #######################################
                  ### Ends the WebSocket connection. ####
                  #######################################

                  streamclient.end()

                  return ''

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          streamclient.end()

          return x

The code is a python function called processVoice() that processes a user’s voice input using the Rev.AI API. The function takes in one argument, “var,” which is not used in the code.

  1. Let us understand the code –
    • First, the function sets several variables, including the Rev.AI API access token, the sample rate, and the chunk size for the audio input.
    • Then, it creates a media configuration object for raw microphone input.
    • A RevAiStreamingClient object is created using the access token and the media configuration.
    • The code opens the microphone input using a statement and the microphone stream class.
    • Within the statement, the code starts the server connection and a thread that sends microphone audio to the server.
    • The code then iterates through the responses from the server, normalizing the JSON response and storing the values in a pandas data-frame.
    • If the “confidence” column exists in the data-frame, the code joins all the values to form the final text and raises an exception.
      • If there is an exception, the WebSocket connection is ended, and the final text is returned.
      • If there is any error, the WebSocket connection is also ended, and an empty string or the error message is returned.

  • clsMicrophoneStream.py (This python script invoke the rev_ai template to capture the chunk voice data & stream it to the service for text translation & return the response to app.)


#####################################################
#### Modified By: SATYAKI DE ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### rev_ai template to capture the chunk voice ####
#### data & stream it to the service for text ####
#### translation & return the response to app. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from six.moves import queue
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
class clsMicrophoneStream(object):
#############################################
### Opens a recording stream as a ####
### generator yielding the audio chunks. ####
#############################################
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
##################################################
### Create a thread-safe buffer of audio data ####
##################################################
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
#########################################################
### The API currently only supports 1-channel (mono) ####
### audio. ####
#########################################################
channels=1, rate=self._rate,
input=True, frames_per_buffer=self._chunk,
####################################################################
### Run the audio stream asynchronously to fill the buffer ####
### object. Run the audio stream asynchronously to fill the ####
### buffer object. This is necessary so that the input device's ####
### buffer doesn't overflow while the calling thread makes ####
### network requests, etc. ####
####################################################################
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
###############################################################
### Signal the generator to terminate so that the client's ####
### streaming_recognize method will not block the process ####
### termination. ####
###############################################################
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
##############################################################
### Continuously collect data from the audio stream, into ####
### the buffer. ####
##############################################################
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
while not self.closed:
######################################################################
### Use a blocking get() to ensure there's at least one chunk of ####
### data, and stop iteration if the chunk is None, indicating the ####
### end of the audio stream. ####
######################################################################
chunk = self._buff.get()
if chunk is None: