RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Enabling OpenAI-based NLP engine with SIRI (MacBook/iPad/iPhone) through a proxy-driven restricted API using Python.

Today, I’m very excited to demonstrate an effortless & new way to integrate SIRI with a controlled Open-AI exposed through a proxy API. So, why this is important; this will give you options to control your ChatGPT environment as per your principles & then you can use a load-balancer (if you want) & exposed that through proxy.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.


Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it fascinating? This approach will lead to a whole new ballgame, where you can add SIRI with an entirely new world of knowledge as per your requirements & expose them in a controlled way.

FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, Apple Shortcuts triggered the requests through its voice app, which then translates the question to text & then it will invoke the ngrok proxy API, which will eventually trigger the controlled custom API built using Flask & Python to start the Open AI API.


CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 27-Jun-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based MAC-shortcuts ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_TOKEN' : 60,
'OUT_DIR': 'data'
}

Some of the important entries from the above snippet are as follows –

        'MODEL_NAME': 'gpt-3.5-turbo',
        'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
        'TEMP_VAL': 0.2,

TEMP_VAL will help you to control the response in a more authentic manner. It varies between 0 to 1.

  • clsJarvis.py (This is the main calling Python script for the input parameters.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### Flask framework to expose the OpenAI ####
#### API with more control & encapsulate the ####
#### server IPs with proxy layers. ####
#### ####
#####################################################
import openai
from flask import request, jsonify
from clsConfigClient import clsConfigClient as cf
import os
import clsTemplate as ct
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
openai.api_key = open_ai_Key
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsJarvis:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.max_token = cf.conf['MAX_TOKEN']
self.temp_val = cf.conf['TEMP_VAL']
def extractContentInText(self, query):
try:
model_name = self.model_name
max_token = self.max_token
temp_val = self.temp_val
template = ct.templateVal_1
response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
inputJson = {"text": response['choices'][0]['message']['content']}
return jsonify(inputJson)
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
template = ct.templateVal_2
inputJson = {"text": template}
return jsonify(inputJson)

view raw

clsJarvis.py

hosted with ❤ by GitHub

The key snippets from the above script are as follows –

def extractContentInText(self, query):
    try:
        model_name = self.model_name
        max_token = self.max_token
        temp_val = self.temp_val

        template = ct.templateVal_1

        response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
        inputJson = {"text": response['choices'][0]['message']['content']}

        return jsonify(inputJson)
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)
        template = ct.templateVal_2

        inputJson = {"text": template}

        return jsonify(inputJson)

The provided Python code snippet defines a method extractContentInText, which interacts with OpenAI’s API to generate a response from OpenAI’s chat model to a user’s query. Here’s a summary of what it does:

  1. It fetches some predefined model configurations (model_name, max_token, temp_val). These are class attributes defined elsewhere.
  2. It sets a system message template (initial instruction for the AI model) using ct.templateVal_1. The ct object isn’t defined within this snippet but is likely another predefined object or module in the more extensive program.
  3. It then calls openai.ChatCompletion.create() to send messages to the AI model and generate a response. The statements include an initial system message and a user’s query.
  4. The model’s response is extracted and formatted into a JSON object inputJson where the ‘text’ field holds the AI’s response.
  5. The input JSON object returns a JSON response.

If an error occurs at any stage of this process (caught in the except block), it prints the error, sets a fallback message template using ct.templateVal_2, formats this into a JSON object, and returns it as a JSON response.

Note: The max_token variable is fetched but not used within the function; it might be a remnant of previous code or meant to be used in further development. The code also assumes a predefined ct object and a method called jsonify(), possibly from Flask, for formatting Python dictionaries into JSON format.

  • testJarvis.py (This is the main calling Python script.)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### shortcut application created inside MAC ####
#### enviornment including MacBook, IPad or IPhone. ####
#### ####
#########################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import clsJarvis as jv
import datetime
from flask import Flask, request, jsonify
app = Flask(__name__)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
cJarvis = jv.clsJarvis()
######################################
#### Global Flag ########
######################################
@app.route('/openai', methods=['POST'])
def openai_call():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
data = request.get_json()
print('Data::')
print(data)
prompt = data.get('prompt', '')
print('Prompt::')
print(prompt)
res = cJarvis.extractContentInText(str(prompt))
return res
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
app.run(host='0.0.0.0')

view raw

testJarvis.py

hosted with ❤ by GitHub

Please find the key snippets –

@app.route('/openai', methods=['POST'])
def openai_call():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        data = request.get_json()
        print('Data::')
        print(data)
        prompt = data.get('prompt', '')

        print('Prompt::')
        print(prompt)

        res = cJarvis.extractContentInText(str(prompt))

        return res

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

The provided Python code defines a route in a Flask web server that listens for POST requests at the ‘/openai’ endpoint. Here’s what it does in detail:

  1. It records and prints the current time, marking the start of the request handling.
  2. It retrieves the incoming data from the POST request as JSON with the request.get_json().
  3. It then extracts the ‘prompt’ from the JSON data. The request defaults to an empty string if no ‘prompt’ is provided in the request.
  4. The prompt is passed as an argument to the method extractContentInText() object cJarvis. This method is expected to use OpenAI’s API to generate a response from a model given the prompt (as discussed in your previous question). The result of this method call is stored in the variable res.
  5. The res variable (the model’s response) returns the answer to the client requesting the POST.
  6. It prints the current time again, marking the end of the request handling (However, this part of the code will never be executed as it places after a return statement).
  7. If an error occurs during this process, it catches the exception, converts it to a string, and prints the error message.

The cJarvis object used in the cJarvis.extractContentInText(str(prompt)) call is not defined within this code snippet. It is a global object likely defined elsewhere in the more extensive program. The extractContentInText method is the one you shared in your previous question.

Apple Shortcuts:

Now, let us understand the steps in Apple Shortcuts.

You can now set up a Siri Shortcut to call the URL provided by ngrok:

  1. Open the Shortcuts app on your iPhone.
  2. Tap the ‘+’ to create a new Shortcut.
  3. Add an action, search for “URL,” and select the URL action. Enter your ngrok URL here, with the /openai endpoint.
  4. Add another action, search for “Get Contents of URL.” This step will send a POST request to the URL from the previous activity. Set the method to POST and add a request body with type ‘JSON,’ containing a key ‘prompt’ and a value being the input you want to send to your OpenAI model.
  5. Optionally, you can add another action, “Show Result” or “Speak Text” to see/hear the result returned from your server.
  6. Save your Shortcut and give it a name.

You should now be able to activate Siri and say the name of your Shortcut to have it send a request to your server, which will then send a prompt to the OpenAI API and return the response.

Let us understand the “Get contents of” with easy postman screenshots –

As you can see that the newly exposed proxy-API will receive an input named prompt, which will be passed from “Dictate Text.”


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Demonstration of GPT-3 model tuning using Python for an upcoming PyPi-package

Today, I’m very excited to demonstrate an effortless & new way to fine-tune the GPT-3 model using Python with the help of my new build (unpublished) PyPi package. In this post, I plan to deal with the custom website link as a response from this website depending upon the user queries with the help of the OpenAI-based tuned model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it exciting? Finally, we can efficiently handle your custom website URL using OpenAI tuned model.


What is ChatGPT?

ChatGPT is an advanced artificial intelligence language model developed by OpenAI based on the GPT-4 architecture. As an AI model, it is designed to understand and generate human-like text-based on the input it receives. ChatGPT can engage in various tasks, such as answering questions, providing recommendations, creating content, and simulating conversation. While it is highly advanced and versatile, it’s important to note that ChatGPT’s knowledge is limited to the data it was trained on, with a cutoff date of September 2021.

When to tune GPT model?

Tuning a GPT or any AI model might be necessary for various reasons. Here are some common scenarios when you should consider adjusting or fine-tuning a GPT model:

  1. Domain-specific knowledge: If you need your model to have a deeper understanding of a specific domain or industry, you can fine-tune it with domain-specific data to improve its performance.
  2. New or updated data: If new or updated information is not part of the original training data, you should fine-tune the model to ensure it has the most accurate and up-to-date knowledge.
  3. Customization: If you require the model to have a specific style, tone, or focus, you can fine-tune it with data that reflects those characteristics.
  4. Ethical or safety considerations: To make the model safer and more aligned with human values, you should fine-tune it to reduce biased or harmful outputs.
  5. Improve performance: If the base model’s performance is unsatisfactory for a particular task or application, you can fine-tune it on a dataset more relevant to the job, often leading to better results.

Remember that tuning or fine-tuning a GPT model requires access to appropriate data and computational resources and an understanding of the model’s architecture and training techniques. Additionally, monitoring and evaluating the model’s performance after fine-tuning is essential to ensure that the desired improvements have been achieved.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

The initial Python-based client interacts with the tuned OpenAI models. This process enables it to get a precise response with custom data in a very convenient way. So that anyone can understand.


SOURCE DATA:

Let us understand how to feed the source data as it will deal with your website URL link.

The first data that we are going to talk about is the one that contains the hyperlink. Let us explore the sample here.

From the above diagram, one can easily understand that the application will interpret a unique hash number associated with a specific URL. This data will be used to look up the URL after the OpenAI response from the tuned model as a result of any user query.

Now, let us understand the actual source data.

If we closely check, we’ll see the source file contains two columns – prompt & completion. And the website reference is put inside the curly braces as shown – “{Hash Code that represents your URL}.”

During the response, the newly created library replaces the hash value with the correct URL after the successful lookup & presents the complete answer.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 21-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### OpenAI fine-tune projects. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'ChatGPT Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'TEMP_FILE_NAME': 'chatGPTData.jsonl',
'TITLE': "GPT-3 Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/&#39;,
'EPOCH': 10,
'SUFFIX': 'py-saty',
'EXIT_KEYWORD': 'bye'
}

Some of the important entries that will require later are as follows –

'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/',
'EXIT_KEYWORD': 'bye'

We’ll discuss these entries later.

  • trainChatGPTModel.py (This is the main calling Python script that will invoke the newly created fine-tune GPT-3 enabler.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune GPT-3 enabler. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
#tModel = tm.clsTrainModel()
tModel = tm.clsTrainModel3()
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
#clog.logr(OutPutFileName, debug_ind, df, subdir)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Following are the key snippet from the above script –

data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']

And, then –

tModel = tm.clsTrainModel3()
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)

As one can see, the package needs only the source data file to fine-tune GPT-3 model.

  • checkFineTuneChatGPTModelStat.py (This is the main Python script that will check the status of the tuned process that will happen inside the OpenAI-cloud environment.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune job status inside ####
#### the OpenAI environment. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
# Example usage
input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)
r1 = tmodel.checkStat(url, open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

To check the status of the fine-tuned job inside the OpenAI environment, one needs to provide the fine tune id, which generally starts with -> “ft-*.” One would get this value after the train script’s successful run.

Some of the other key snippets are –

tmodel = tm.clsTestModel3()

url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']

And, then –

input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)

r1 = tmodel.checkStat(url, open_api_key)

The above snippet is self-explanatory as one is passing the fine tune id along with the OpenAI API key.

  • testChatGPTModel.py (This is the main testing Python script that will invoke the newly created fine-tune GPT-3 enabler to get a response with custom data.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 19-Apr-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created class that will test the ####
#### tuned model output. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import pandas as p
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)
if r1 == 0:
print('Successfully tested the tuned GPT-3 model.')
else:
print('Failed to test the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key entries from the above snippet are as follows –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']

And, then –

LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)

In the above lines, the application gets the correct URL value from the look file we’ve prepared for this specific use case.

  • deleteChatGPTModel.py (This is the main Python script that will delete the old intended tuned model, which is no longer needed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 21-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created delete model methods for ####
#### OpenAI. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
r1 = tmodel.delOldModel(open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key snippets from the above scripts are –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']

And, then –

r1 = tmodel.delOldModel(open_api_key)

We’ve demonstrated that using a straightforward method, one can delete any old tuned model from OpenAI that is no longer required.

KEY FEATURES TO CONSIDER DURING TUNING:

  • Data quality: Ensure that the data used for fine-tuning is clean, relevant, and representative of the domain you want the model to understand. Check for biases, inconsistencies, and errors in the dataset.
  • Overfitting: Be cautious of overfitting, which occurs when the model performs exceptionally well on the training data but poorly on unseen data. You can address overfitting by using regularization techniques, early stopping, or cross-validation.
  • Model size and resource requirements: GPT models can be resource-intensive. Be mindful of the hardware limitations and computational resources available when selecting the model size and the time and cost associated with training.
  • Hyperparameter tuning: Select appropriate hyperparameters for your fine-tuning processes, such as learning rate, batch size, and the number of epochs. Experiment with different combinations to achieve the best results without overfitting.
  • Evaluation metrics: Choose suitable evaluation metrics to assess the performance of your fine-tuned model. Consider using multiple metrics to understand your model’s performance comprehensively.
  • Ethical considerations: Be aware of potential biases in your dataset and how the model’s predictions might impact users. Address ethical concerns during the fine-tuning process and consider using techniques such as data augmentation or adversarial training to mitigate these biases.
  • Monitoring and maintenance: Continuously monitor the model’s performance after deployment, and be prepared to re-tune or update it as needed. Regular maintenance ensures that the model remains relevant and accurate.
  • Documentation: Document your tuning process, including the data used, model architecture, hyperparameters, and evaluation metrics. This factor will facilitate easier collaboration, replication, and model maintenance.
  • Cost: OpenAI fine-tuning can be extremely expensive, even for a small volume of data. Hence, organization-wise, one needs to be extremely careful while using this feature.

COST FACTOR:

Before we discuss the actual spending, let us understand the tested data volume to train & tune the model.

So, we’re talking about a total size of 500 KB (at max). And, we did 10 epochs during the training as you can see from the config file mentioned above.

So, it is pretty expensive. Use it wisely.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Personal Virtual Assistant (SJ) implemented using python-based OpenAI, Rev_AI & PyTtSX3.

Today, I will discuss our Virtual personal assistant (SJ) with a combination of AI-driven APIs, which is now operational in Python. We will use the three most potent APIs using OpenAI, Rev-AI & Pyttsx3. Why don’t we see the demo first?

Great! Let us understand we can leverage this by writing a tiny snippet using this new AI model.

Architecture:

Let us understand the flow of events –

The application first invokes the API to capture the audio spoken through the audio device & then translate that into text, which is later parsed & shared as input by the openai for the response of the posted queries. Once, OpenAI shares the response, the python-based engine will take the response & using pyttsx3 to convert them to voice.


Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install openai==0.25.0
pip install PyAudio==0.2.13
pip install playsound==1.3.0
pip install pandas==1.5.2
pip install rev-ai==2.17.1
pip install six==1.16.0
pip install websocket-client==0.59.0

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsConfigClient.py (Main configuration file)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 31-Dec-2022 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal AI-driven voice assistant. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'output' + sep,
'REPORT_DIR': 'output',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'CODE_PATH': Curr_Path + sep + 'Code' + sep,
'APP_DESC_1': 'Personal Voice Assistant (SJ)!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'TITLE': "Personal Voice Assistant (SJ)!",
'PATH' : Curr_Path,
'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1
}

A few of the essential entries from the above snippet, which one should be looked for, are –

'OPENAI_API_KEY': "sk-aapwfMWDuFE5XXXUr2BH",
'REVAI_API_KEY': "02ks6kFhEKjdhdure8474JJAJJ945958_h8P_DEKDNkK6DwNNNHU17aRtCw",
'MODEL_NAME': "code-davinci-002",
"speedSpeech": 170,
"speedPitch": 0.8,
"soundRate": 44100,
"contentType": "audio/x-raw",
"layout": "interleaved",
"format": "S16LE",
"channels": 1

Note that, all the API-key are not real. You need to generate your own key.

  • clsText2Voice.py (The python script that will convert text to voice)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Oct-2019 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: Main class converting ####
#### text to voice using third-party API. ####
###############################################
import pyttsx3
from clsConfigClient import clsConfigClient as cf
class clsText2Voice:
def __init__(self):
self.speedSpeech = cf.conf['speedSpeech']
self.speedPitch = cf.conf['speedPitch']
def getAudio(self, srcString):
try:
speedSpeech = self.speedSpeech
speedPitch = self.speedPitch
engine = pyttsx3.init()
# Set the speed of the speech (in words per minute)
engine.setProperty('rate', speedSpeech)
# Set the pitch of the speech (1.0 is default)
engine.setProperty('pitch', speedPitch)
# Converting to MP3
engine.say(srcString)
engine.runAndWait()
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the important snippet will be as follows –

def getAudio(self, srcString):
    try:
        speedSpeech = self.speedSpeech
        speedPitch = self.speedPitch
        
        engine = pyttsx3.init()

        # Set the speed of the speech (in words per minute)
        engine.setProperty('rate', speedSpeech)

        # Set the pitch of the speech (1.0 is default)
        engine.setProperty('pitch', speedPitch)

        # Converting to MP3
        engine.say(srcString)
        engine.runAndWait()

        return 0

The code is a function that generates speech audio from a given string using the Pyttsx3 library in Python. The function sets the speech rate and pitch using the “speedSpeech” and “speedPitch” properties of the calling object, initializes the Pyttsx3 engine, sets the speech rate and pitch on the engine, speaks the given string, and waits for the speech to finish. The function returns 0 after the speech is finished.


  • clsChatEngine.py (This python script will invoke the ChatGPT OpenAI class to initiate the response of the queries in python.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### ChatGPT OpenAI class to initiate the ####
#### response of the queries in python. ####
#####################################################
import os
import openai
import json
from clsConfigClient import clsConfigClient as cf
import sys
import errno
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
CODE_PATH=str(cf.conf['CODE_PATH'])
MODEL_NAME=str(cf.conf['MODEL_NAME'])
###############################################
### End of Global Section ###
###############################################
class clsChatEngine:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
def findFromSJ(self, text):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
# ChatGPT API_KEY
openai.api_key = OPENAI_API_KEY
print('22'*60)
try:
# Getting response from ChatGPT
response = openai.Completion.create(
engine=MODEL_NAME,
prompt=text,
max_tokens=64,
top_p=1.0,
n=3,
temperature=0,
frequency_penalty=0.0,
presence_penalty=0.0,
stop=["\"\"\""]
)
except IOError as e:
if e.errno == errno.EPIPE:
pass
print('44'*60)
res = response.choices[0].text
return res
except IOError as e:
if e.errno == errno.EPIPE:
pass
except Exception as e:
x = str(e)
print(x)
print('66'*60)
return x

Key snippets from the above-script are as follows –

def findFromSJ(self, text):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY

          # ChatGPT API_KEY
          openai.api_key = OPENAI_API_KEY

          print('22'*60)

          try:
              # Getting response from ChatGPT
              response = openai.Completion.create(
              engine=MODEL_NAME,
              prompt=text,
              max_tokens=64,
              top_p=1.0,
              n=3,
              temperature=0,
              frequency_penalty=0.0,
              presence_penalty=0.0,
              stop=["\"\"\""]
              )
          except IOError as e:
              if e.errno == errno.EPIPE:
                  pass

          print('44'*60)
          res = response.choices[0].text

          return res

      except IOError as e:
          if e.errno == errno.EPIPE:
              pass

      except Exception as e:
          x = str(e)
          print(x)

          print('66'*60)

          return x

The code is a function that uses OpenAI’s ChatGPT model to generate text based on a given prompt text. The function takes the text to be completed as input and uses an API key stored in the OPENAI_API_KEY property of the calling object to request OpenAI’s API. If the request is successful, the function returns the top completion generated by the model, as stored in the text field of the first item in the choices list of the API response.

The function includes error handling for IOError and Exception. If an IOError occurs, the function checks if the error number is errno.EPIPE and, if it is, returns without doing anything. If an Exception occurs, the function converts the error message to a string and prints it, then returns the string.


  • clsVoice2Text.py (This python script will invoke the Rev-AI class to initiate the transformation of audio into the text.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### Rev-AI class to initiate the transformation ####
#### of audio into the text. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from rev_ai.streamingclient import RevAiStreamingClient
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
# Initiating Log class
l = cl.clsL()
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
################################################################
### Sampling rate of your microphone and desired chunk size ####
################################################################
class clsVoice2Text:
def __init__(self):
self.OPENAI_API_KEY=str(cf.conf['OPENAI_API_KEY'])
self.rate = cf.conf['soundRate']
def processVoice(self, var):
try:
OPENAI_API_KEY = self.OPENAI_API_KEY
accessToken = cf.conf['REVAI_API_KEY']
rate = self.rate
chunk = int(rate/10)
################################################################
### Creates a media config with the settings set for a raw ####
### microphone input ####
################################################################
sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)
streamclient = RevAiStreamingClient(accessToken, sampleMC)
#####################################################################
### Opens microphone input. The input will stop after a keyboard ####
### interrupt. ####
#####################################################################
with ms.clsMicrophoneStream(rate, chunk) as stream:
#####################################################################
### Uses try method to enable users to manually close the stream ####
#####################################################################
try:
response_gen = ''
response = ''
finalText = ''
#########################################################################
### Starts the server connection and thread sending microphone audio ####
#########################################################################
response_gen = streamclient.start(stream.generator())
###################################################
### Iterates through responses and prints them ####
###################################################
for response in response_gen:
try:
print('JSON:')
print(response)
r = json.loads(response)
df = p.json_normalize(r["elements"])
l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
column_name = "confidence"
if column_name in df.columns:
print('DF:: ')
print(df)
finalText = "".join(df["value"])
print("TEXT:")
print(finalText)
df = p.DataFrame()
raise Exception
except Exception as e:
x = str(e)
break
streamclient.end()
return finalText
except Exception as e:
x = str(e)
#######################################
### Ends the WebSocket connection. ####
#######################################
streamclient.end()
return ''
except Exception as e:
x = str(e)
print('Error: ', x)
streamclient.end()
return x

Here is the important snippet from the above code –

def processVoice(self, var):
      try:
          OPENAI_API_KEY = self.OPENAI_API_KEY
          accessToken = cf.conf['REVAI_API_KEY']
          rate = self.rate
          chunk = int(rate/10)

          ################################################################
          ### Creates a media config with the settings set for a raw  ####
          ### microphone input                                        ####
          ################################################################

          sampleMC = MediaConfig('audio/x-raw', 'interleaved', 44100, 'S16LE', 1)

          streamclient = RevAiStreamingClient(accessToken, sampleMC)

          #####################################################################
          ### Opens microphone input. The input will stop after a keyboard ####
          ### interrupt.                                                   ####
          #####################################################################

          with ms.clsMicrophoneStream(rate, chunk) as stream:

              #####################################################################
              ### Uses try method to enable users to manually close the stream ####
              #####################################################################

              try:
                  response_gen = ''
                  response = ''
                  finalText = ''
                  
                  ############################################
                  ### Starts the server connection        ####
                  ### and thread sending microphone audio #### 
                  ############################################

                  response_gen = streamclient.start(stream.generator())

                  ###################################################
                  ### Iterates through responses and prints them ####
                  ###################################################

                  for response in response_gen:
                      try:
                          print('JSON:')
                          print(response)

                          r = json.loads(response)

                          df = p.json_normalize(r["elements"])
                          l.logr('1.df_' + var + '.csv', debug_ind, df, 'log')
                          column_name = "confidence"

                          if column_name in df.columns:
                              print('DF:: ')
                              print(df)

                              finalText = "".join(df["value"])
                              print("TEXT:")
                              print(finalText)

                              df = p.DataFrame()

                              raise Exception

                      except Exception as e:
                          x = str(e)
                          break

                  streamclient.end()

                  return finalText

              except Exception as e:
                  x = str(e)
                  #######################################
                  ### Ends the WebSocket connection. ####
                  #######################################

                  streamclient.end()

                  return ''

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          streamclient.end()

          return x

The code is a python function called processVoice() that processes a user’s voice input using the Rev.AI API. The function takes in one argument, “var,” which is not used in the code.

  1. Let us understand the code –
    • First, the function sets several variables, including the Rev.AI API access token, the sample rate, and the chunk size for the audio input.
    • Then, it creates a media configuration object for raw microphone input.
    • A RevAiStreamingClient object is created using the access token and the media configuration.
    • The code opens the microphone input using a statement and the microphone stream class.
    • Within the statement, the code starts the server connection and a thread that sends microphone audio to the server.
    • The code then iterates through the responses from the server, normalizing the JSON response and storing the values in a pandas data-frame.
    • If the “confidence” column exists in the data-frame, the code joins all the values to form the final text and raises an exception.
      • If there is an exception, the WebSocket connection is ended, and the final text is returned.
      • If there is any error, the WebSocket connection is also ended, and an empty string or the error message is returned.

  • clsMicrophoneStream.py (This python script invoke the rev_ai template to capture the chunk voice data & stream it to the service for text translation & return the response to app.)


#####################################################
#### Modified By: SATYAKI DE ####
#### Modified On 28-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### rev_ai template to capture the chunk voice ####
#### data & stream it to the service for text ####
#### translation & return the response to app. ####
#####################################################
import pyaudio
from rev_ai.models import MediaConfig
from six.moves import queue
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
class clsMicrophoneStream(object):
#############################################
### Opens a recording stream as a ####
### generator yielding the audio chunks. ####
#############################################
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
##################################################
### Create a thread-safe buffer of audio data ####
##################################################
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
#########################################################
### The API currently only supports 1-channel (mono) ####
### audio. ####
#########################################################
channels=1, rate=self._rate,
input=True, frames_per_buffer=self._chunk,
####################################################################
### Run the audio stream asynchronously to fill the buffer ####
### object. Run the audio stream asynchronously to fill the ####
### buffer object. This is necessary so that the input device's ####
### buffer doesn't overflow while the calling thread makes ####
### network requests, etc. ####
####################################################################
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
###############################################################
### Signal the generator to terminate so that the client's ####
### streaming_recognize method will not block the process ####
### termination. ####
###############################################################
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
##############################################################
### Continuously collect data from the audio stream, into ####
### the buffer. ####
##############################################################
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
while not self.closed:
######################################################################
### Use a blocking get() to ensure there's at least one chunk of ####
### data, and stop iteration if the chunk is None, indicating the ####
### end of the audio stream. ####
######################################################################
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
##########################################################
### Now consume whatever other data's still buffered. ####
##########################################################
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b''.join(data)

The key snippet from the above script are as follows –

def __enter__(self):
    self._audio_interface = pyaudio.PyAudio()
    self._audio_stream = self._audio_interface.open(
        format=pyaudio.paInt16,

        #########################################################
        ### The API currently only supports 1-channel (mono) ####
        ### audio.                                           ####
        #########################################################

        channels=1, rate=self._rate,
        input=True, frames_per_buffer=self._chunk,

        ####################################################################
        ### Run the audio stream asynchronously to fill the buffer      ####
        ### object. Run the audio stream asynchronously to fill the     ####
        ### buffer object. This is necessary so that the input device's ####
        ### buffer doesn't overflow while the calling thread makes      ####
        ### network requests, etc.                                      ####
        ####################################################################

        stream_callback=self._fill_buffer,
    )

    self.closed = False

    return self

This code is a part of a context manager class (clsMicrophoneStream) and implements the __enter__ method of the class. The method sets up a PyAudio object and opens an audio stream using the PyAudio object. The audio stream is configured to have the following properties:

  • Format: 16-bit integer (paInt16)
  • Channels: 1 (mono)
  • Rate: The rate specified in the instance of the ms.clsMicrophoneStream class.
  • Input: True, meaning the audio stream is an input stream, not an output stream.
  • Frames per buffer: The chunk specified in the instance of the ms.clsMicrophoneStream class.
  • Stream callback: The method self._fill_buffer will be called when the buffer needs more data.

The self.closed attribute is set to False to indicate that the stream is open. The method returns the instance of the class (self).

def __exit__(self, type, value, traceback):
    self._audio_stream.stop_stream()
    self._audio_stream.close()
    self.closed = True

    ###############################################################
    ### Signal the generator to terminate so that the client's ####
    ### streaming_recognize method will not block the process  ####
    ### termination.                                           ####
    ###############################################################

    self._buff.put(None)
    self._audio_interface.terminate()

The exit method implements the “exit” behavior of a Python context manager. It is automatically called when the context manager is exited using the statement.

The method stops and closes the audio stream, sets the closed attribute to True, and places None in the buffer. The terminate method of the PyAudio interface is then called to release any resources used by the audio stream.

def _fill_buffer(self, in_data, frame_count, time_info, status_flags):

    ##############################################################
    ### Continuously collect data from the audio stream, into ####
    ### the buffer.                                           ####
    ##############################################################

    self._buff.put(in_data)
    return None, pyaudio.paContinue

The _fill_buffer method is a callback function that runs asynchronously to continuously collect data from the audio stream and add it to the buffer.

The _fill_buffer method takes four arguments:

  • in_data: the raw audio data collected from the audio stream.
  • frame_count: the number of frames of audio data that was collected.
  • time_info: information about the timing of the audio data.
  • status_flags: flags that indicate the status of the audio stream.

The method adds the collected in_data to the buffer using the put method of the buffer object. It returns a tuple of None and pyaudio.paContinue to indicate that the audio stream should continue.

def generator(self):
    while not self.closed:
        ######################################################################
        ### Use a blocking get() to ensure there's at least one chunk of  ####
        ### data, and stop iteration if the chunk is None, indicating the ####
        ### end of the audio stream.                                      ####
        ######################################################################

        chunk = self._buff.get()
        if chunk is None:
            return
        data = [chunk]

        ##########################################################
        ### Now consume whatever other data's still buffered. ####
        ##########################################################

        while True:
            try:
                chunk = self._buff.get(block=False)
                if chunk is None:
                    return
                data.append(chunk)
            except queue.Empty:
                break

        yield b''.join(data)

The logic of the code “def generator(self):” is as follows:

The function generator is an infinite loop that runs until self.closed is True. Within the loop, it uses a blocking get() method of the buffer object (self._buff) to retrieve a chunk of audio data. If the retrieved chunk is None, it means the end of the audio stream has been reached, and the function returns.

If the retrieved chunk is not None, it appends it to the data list. The function then enters another inner loop that continues to retrieve chunks from the buffer using the non-blocking get() method until there are no more chunks left. Finally, the function yields the concatenated chunks of data as a single-byte string.


  • SJVoiceAssistant.py (Main calling python script)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Dec-2022 ####
#### Modified On 31-Jan-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### multiple classes to initiate the ####
#### AI-enabled personal assistant, which would ####
#### display & answer the queries through voice. ####
#####################################################
import pyaudio
from six.moves import queue
import ssl
import json
import pandas as p
import clsMicrophoneStream as ms
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsChatEngine as ce
import clsText2Voice as tv
import clsVoice2Text as vt
#from signal import signal, SIGPIPE, SIG_DFL
#signal(SIGPIPE,SIG_DFL)
###################################################
##### Adding the Instantiating Global classes #####
###################################################
x2 = ce.clsChatEngine()
x3 = tv.clsText2Voice()
x4 = vt.clsVoice2Text()
# Initiating Log class
l = cl.clsL()
###################################################
##### End of Global Classes #######
###################################################
# Bypassing SSL Authentication
try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
# Legacy python that doesn't verify HTTPS certificates by default
pass
else:
# Handle target environment that doesn't support HTTPS verification
ssl._create_default_https_context = _create_unverified_https_context
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Insert your access token here ####
######################################
debug_ind = 'Y'
######################################
#### Global Flag ########
######################################
def main():
try:
spFlag = True
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
exitComment = 'THANKS.'
while True:
try:
finalText = ''
if spFlag == True:
finalText = x4.processVoice(var)
else:
pass
val = finalText.upper().strip()
print('Main Return: ', val)
print('Exit Call: ', exitComment)
print('Length of Main Return: ', len(val))
print('Length of Exit Call: ', len(exitComment))
if val == exitComment:
break
elif finalText == '':
spFlag = True
else:
print('spFlag::',spFlag)
print('Inside: ', finalText)
resVal = x2.findFromSJ(finalText)
print('ChatGPT Response:: ')
print(resVal)
resAud = x3.getAudio(resVal)
spFlag = False
except Exception as e:
pass
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('End Time: ' + str(var1))
print('SJ Voice Assistant exited successfully!')
print('*'*120)
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

And, the key snippet from the above script –

def main():
    try:
        spFlag = True

        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        exitComment = 'THANKS.'

        while True:
            try:
                finalText = ''

                if spFlag == True:
                    finalText = x4.processVoice(var)
                else:
                    pass

                val = finalText.upper().strip()

                print('Main Return: ', val)
                print('Exit Call: ', exitComment)
                print('Length of Main Return: ', len(val))
                print('Length of Exit Call: ', len(exitComment))

                if val == exitComment:
                    break
                elif finalText == '':
                    spFlag = True
                else:
                    print('spFlag::',spFlag)
                    print('Inside: ', finalText)
                    resVal = x2.findFromSJ(finalText)

                    print('ChatGPT Response:: ')
                    print(resVal)

                    resAud = x3.getAudio(resVal)
                    spFlag = False
            except Exception as e:
                pass

        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('End Time: ' + str(var1))
        print('SJ Voice Assistant exited successfully!')
        print('*'*120)

    except Exception as e:
        x = str(e)
        print('Error: ', x)

The code is a Python script that implements a voice-based chatbot (likely named “SJ Voice Assistant”). The code performs the following operations:

  1. Initialize the string “exitComment” to “THANKS.” and set the “spFlag” to True.
  2. Start an infinite loop until a specific condition breaks the loop.
  3. In the loop, try to process the input voice with a function called “processVoice()” from an object “x4”. Store the result in “finalText.”
  4. Convert “finalText” to upper case, remove leading/trailing whitespaces, and store it in “val.” Print “Main Return” and “Exit Call” with their length.
  5. If “val” equals “exitComment,” break the loop. Suppose “finalText” is an empty string; set “spFlag” to True. Otherwise, perform further processing: a. Call the function “findFromSJ()” from an object “x2” with the input “finalText.” Store the result in “resVal.” b. Call the function “getAudio()” from an object “x3” with the input “resVal.” Store the result in “resAud.” Set “spFlag” to False.
  6. If an exception occurs, catch it and pass (do nothing).
  7. Finally the application will exit by displaying the following text – “SJ Voice Assistant exited successfully!”
  8. If an exception occurs outside the loop, catch it and print the error message.

So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Real-time augmented reality (AR) using Python-based Computer Vision

Hi Team,

Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.

Why don’t we see the demo first before jumping into the technical details?


Demo

Architecture:

Let us understand the architecture –

Process Flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.

Python Packages:

Following are the python packages that are necessary to develop this brilliant use case –

pip install opencv-python
pip install pygame

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 20-Jun-2022 ####
#### Modified On 25-Jun-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will embed the source ####
#### video with the WebCAM streams in ####
#### real-time. ####
##################################################
# Importing necessary packages
import numpy as np
import cv2
from clsConfig import clsConfig as cf
# Initialize our cached reference points
CACHED_REF_PTS = None
class clsAugmentedReality:
def __init__(self):
self.TOP_LEFT_X = int(cf.conf['TOP_LEFT_X'])
self.TOP_LEFT_Y = int(cf.conf['TOP_LEFT_Y'])
self.TOP_RIGHT_X = int(cf.conf['TOP_RIGHT_X'])
self.TOP_RIGHT_Y = int(cf.conf['TOP_RIGHT_Y'])
self.BOTTOM_RIGHT_X = int(cf.conf['BOTTOM_RIGHT_X'])
self.BOTTOM_RIGHT_Y = int(cf.conf['BOTTOM_RIGHT_Y'])
self.BOTTOM_LEFT_X = int(cf.conf['BOTTOM_LEFT_X'])
self.BOTTOM_LEFT_Y = int(cf.conf['BOTTOM_LEFT_Y'])
def getWarpImages(self, frame, source, cornerIDs, arucoDict, arucoParams, zoomFlag, useCache=False):
try:
# Assigning values
TOP_LEFT_X = self.TOP_LEFT_X
TOP_LEFT_Y = self.TOP_LEFT_Y
TOP_RIGHT_X = self.TOP_RIGHT_X
TOP_RIGHT_Y = self.TOP_RIGHT_Y
BOTTOM_RIGHT_X = self.BOTTOM_RIGHT_X
BOTTOM_RIGHT_Y = self.BOTTOM_RIGHT_Y
BOTTOM_LEFT_X = self.BOTTOM_LEFT_X
BOTTOM_LEFT_Y = self.BOTTOM_LEFT_Y
# Grab a reference to our cached reference points
global CACHED_REF_PTS
if source is None:
raise
# Grab the width and height of the frame and source image,
# respectively
# Extracting Frame from Camera
# Exracting Source from Video
(imgH, imgW) = frame.shape[:2]
(srcH, srcW) = source.shape[:2]
# Detect Aruco markers in the input frame
(corners, ids, rejected) = cv2.aruco.detectMarkers(frame, arucoDict, parameters=arucoParams)
print('Ids: ', str(ids))
print('Rejected: ', str(rejected))
# if we *did not* find our four ArUco markers, initialize an
# empty IDs list, otherwise flatten the ID list
print('Detecting Corners: ', str(len(corners)))
ids = np.array([]) if len(corners) != 4 else ids.flatten()
# Initialize our list of reference points
refPts = []
refPtTL1 = []
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
# For zoom option recalculating all the 4 points
refPtTL1_L_X = refPtTL[0][0]TOP_LEFT_X
refPtTL1_L_Y = refPtTL[0][1]TOP_LEFT_Y
refPtTL1.append((refPtTL1_L_X,refPtTL1_L_Y))
refPtTL1_R_X = refPtTL[1][0]+TOP_RIGHT_X
refPtTL1_R_Y = refPtTL[1][1]+TOP_RIGHT_Y
refPtTL1.append((refPtTL1_R_X,refPtTL1_R_Y))
refPtTD1_L_X = refPtTL[2][0]+BOTTOM_RIGHT_X
refPtTD1_L_Y = refPtTL[2][1]+BOTTOM_RIGHT_Y
refPtTL1.append((refPtTD1_L_X,refPtTD1_L_Y))
refPtTD1_R_X = refPtTL[3][0]BOTTOM_LEFT_X
refPtTD1_R_Y = refPtTL[3][1]+BOTTOM_LEFT_Y
refPtTL1.append((refPtTD1_R_X,refPtTD1_R_Y))
dstMatMod = [refPtTL1[0], refPtTL1[1], refPtTL1[2], refPtTL1[3]]
dstMatMod = np.array(dstMatMod)
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source