RAG implementation of LLMs by using Python, Haystack & React (Part – 2)

Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.

In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.

Let us understand the flow of events here –

  1. The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
  2. The application will clean the nested data & extract the relevant attributes after flattening the JSON.
  3. It will create the unstructured text-based context, which is later fed to the Vector DB framework.

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.


We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.

We’ll discuss the scripts in the diagram as part of the flow mentioned above.

  • clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
def genData(self):
    try:
        base_url = self.base_url
        header_token = self.header_token
        basePath = self.basePath
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        subdir = self.subdir
        Ind = self.Ind
        var_1 = datetime.now().strftime("%H.%M.%S")


        devVal = list()
        objVal = list()

        # Main Details
        headers = {'Cookie':header_token}
        payload={}

        url = base_url + '/departments'

        date_ranges = self.generateFirstDayOfLastTenYears()

        # Getting all the departments
        try:
            print('Department URL:')
            print(str(url))

            response = requests.request("GET", url, headers=headers, data=payload)
            parsed_data = json.loads(response.text)

            print('Department JSON:')
            print(str(parsed_data))

            # Extract the "departmentId" values into a Python list
            for dept_det in parsed_data['departments']:
                for info in dept_det:
                    if info == 'departmentId':
                        devVal.append(dept_det[info])

        except Exception as e:
            x = str(e)
            print('Error: ', x)
            devVal = list()

        # List to hold thread objects
        threads = []

        # Calling the Data using threads
        for dep in devVal:
            t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
            threads.append(t)
            t.start()

        # Wait for all threads to complete
        for t in threads:
            t.join()

        res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)

        if res == 0:
            print('Successful!')
        else:
            print('Failure!')

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above code translates into the following steps –

  1. The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
  2. Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
  3. Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
def generateFirstDayOfLastTenYears(self):
    yearRange = self.yearRange
    date_format = "%Y-%m-%d"
    current_year = datetime.now().year

    date_ranges = []
    for year in range(current_year - yearRange, current_year + 1):
        first_day_of_year_full = datetime(year, 1, 1)
        first_day_of_year = first_day_of_year_full.strftime(date_format)
        date_ranges.append(first_day_of_year)

    return date_ranges

The first method will generate the first day of each year for the last ten years, including the current year.

def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
    try:
        cnt = 0
        cnt_x = 1
        var_1 = datetime.now().strftime("%H.%M.%S")

        for x_start_date in date_ranges:
            try:
                urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)

                print('Nested URL:')
                print(str(urlM))

                response_obj = requests.request("GET", urlM, headers=headers, data=payload)
                objectDets = json.loads(response_obj.text)

                for obj_det in objectDets['objectIDs']:
                    objVal.append(obj_det)

                for objId in objVal:
                    urlS = base_url + '/objects/' + str(objId)

                    print('Final URL:')
                    print(str(urlS))

                    response_det = requests.request("GET", urlS, headers=headers, data=payload)
                    objDetJSON = response_det.text

                    retDB = self.createData(objDetJSON)
                    retDB['departmentId'] = str(dep)

                    if cnt == 0:
                        df_M = retDB
                    else:
                        d_frames = [df_M, retDB]
                        df_M = pd.concat(d_frames)

                    if cnt == 1000:
                        cnt = 0
                        clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
                        cnt_x += 1
                        df_M = pd.DataFrame()

                    cnt += 1

            except Exception as e:
                x = str(e)
                print('Error X:', x)
        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above method will invoke the individual API call to fetch the relevant artifact information.

def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
    try:
        csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
        data_frames = []

        for file in csv_files:
            encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
            for encoding in encodings_to_try:
                try:
                    FullFileName = directory_path + file
                    print('File Name: ', FullFileName)
                    df = pd.read_csv(FullFileName, encoding=encoding)
                    data_frames.append(df)
                    break  # Stop trying other encodings if the reading is successful
                except UnicodeDecodeError:
                    continue

        if not data_frames:
            raise Exception("Unable to read CSV files. Check encoding or file format.")

        merged_df = pd.concat(data_frames, ignore_index=True)

        merged_full_name = os.path.join(output_path, output_file)
        merged_df.to_csv(merged_full_name, index=False)

        for file in csv_files:
            os.remove(os.path.join(directory_path, file))

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return 1

The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).

For the complete code, please visit the GitHub.

  • 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################
import datetime
from clsConfigClient import clsConfigClient as cf

import clsExtractJSON as cej

########################################################
################    Global Area   ######################
########################################################

cJSON = cej.clsExtractJSON()

basePath = cf.conf['DATA_PATH']
outputPath = cf.conf['OUTPUT_PATH']
mergedFile = cf.conf['MERGED_FILE']

########################################################
################  End Of Global Area   #################
########################################################

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        r1 = cJSON.genData()

        if r1 == 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script calls the main class after instantiating the class.

  • clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
def createRec(self):
    try:
        basePath = self.basePath
        fileName = self.fileName
        Ind = self.Ind
        subdir = self.subdir
        base_url = self.base_url
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        cleanedFile = self.cleanedFile

        FullFileName = outputPath + mergedFile

        df = pd.read_csv(FullFileName)
        df2 = df[listCol]
        dfFin = df2.drop_duplicates().reset_index(drop=True)

        dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
        dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
        dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])

        # Dropping the old Dtype Columns
        dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
        dfFin.drop(['objectURL'], axis=1, inplace=True)
        dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['AAT_URL'], axis=1, inplace=True)
        dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['URL'], axis=1, inplace=True)

        # Save the filtered DataFrame to a new CSV file
        #clog.logr(cleanedFile, Ind, dfFin, subdir)
        res = self.addHash(dfFin)

        if res == 0:
            print('Added Hash!')
        else:
            print('Failed to add hash!')

        # Generate the text for each row in the dataframe
        for _, row in dfFin.iterrows():
            x = self.genPrompt(row)
            self.addDocument(x, cleanedFile)

        return documents

    except Exception as e:
        x = str(e)
        print('Record Error: ', x)

        return documents

The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.

Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.

Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.

For more details, please visit the GitHub link.

  • 1_1_testCreateRec.py (This is the main class that will call the above class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsCreateList as ccl

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

documents = []

###############################################
###    End of Global Section                ###
###############################################
def main():
    try:
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        print('*'*240)
        print('Creating Index store:: ')
        print('*'*240)

        documents = cl.createRec()

        print('Inserted Sample Records: ')
        print(str(documents))
        print('\n')

        r1 = len(documents)

        if r1 > 0:
            print()
            print('Successfully Indexed sample records!')
        else:
            print()
            print('Failed to sample Indexed recrods!')

        print('*'*120)
        var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.

This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.

– Satyaki
  • clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Sep-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### haystack frameowrk to contextulioze the docs    ####
#### inside the vector DB.                           ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai
import pandas as pd
import os
import clsCreateList as ccl

from clsConfigClient import clsConfigClient as cf
import clsL as log

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

Ind = cf.conf['DEBUG_IND']
openAIKey = cf.conf['OPEN_AI_KEY']

os.environ["TOKENIZERS_PARALLELISM"] = "false"

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

# Encode your data to create embeddings
documents = []

var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var_1))
print('*'*120)

print('*'*240)
print('Creating Index store:: ')
print('*'*240)

documents = cl.createRec()

print('Inserted Sample Records: ')
print(documents[:5])
print('\n')
print('Type:')
print(type(documents))

r1 = len(documents)

if r1 > 0:
    print()
    print('Successfully Indexed records!')
else:
    print()
    print('Failed to Indexed recrods!')

print('*'*120)
var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var_2))

# Passing OpenAI API Key
openai.api_key = openAIKey

###############################################
###    End of Global Section                ###
###############################################

class clsFeedVectorDB:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.modelFileName = cf.conf['CACHE_FILE']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
        self.queryModel = cf.conf['QUERY_MODEL']
        self.passageModel = cf.conf['PASSAGE_MODEL']

    def retrieveDocuments(self, question, retriever, top_k=3):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrievedDocs, question):
        documents_text = " ".join([doc.content for doc in retrievedDocs])
        prompt = f"Given the following documents: {documents_text}, answer the question: {question}"

        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=prompt,
            max_tokens=150
        )
        return response.choices[0].text.strip()

    def ragAnswerWithHaystackAndGPT3(self, question, retriever):
        retrievedDocs = self.retrieveDocuments(question, retriever)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def genData(self, strVal):
        try:
            basePath = self.basePath
            modelFileName = self.modelFileName
            vectorDBPath = self.vectorDBPath
            vectorDBFileName = self.vectorDBFileName
            queryModel = self.queryModel
            passageModel = self.passageModel

            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            FullFileName = basePath + modelFileName
            FullVectorDBname = vectorDBPath + vectorDBFileName

            sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
            print('Vector DB Path: ', str(sqlite_path))

            indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
            indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

            print('File: ', str(indexFile))
            print('Config: ', str(indexConfig))

            # Initialize DocumentStore
            document_store = FAISSDocumentStore(sql_url=sqlite_path)

            libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'

            document_store.write_documents(documents)

            # Initialize Retriever
            retriever = DensePassageRetriever(document_store=document_store,
                                              query_embedding_model=queryModel,
                                              passage_embedding_model=passageModel,
                                              use_gpu=False)

            document_store.update_embeddings(retriever=retriever)

            document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")

            print('*'*120)
            print('Testing with RAG & OpenAI...')
            print('*'*120)

            answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)

            print('*'*120)
            print('Testing Answer:: ')
            print(answer)
            print('*'*120)

            return 0

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In the above script, the following essential steps took place –

  1. First, the application calls the clsCreateList class to store all the documents inside a dictionary.
  2. Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
  3. Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.

Here is a short clip of how the RAG models contextualize with the source data.

RAG-Model Contextualization

So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Hacking the performance of Python Solutions with a custom-built library

Today, I’m very excited to demonstrate an effortless & new way to hack the performance of Python. This post will be a super short & yet crisp presentation of improving the overall performance.

Why not view the demo before going through it?


Demo

Isn’t it exciting? Let’s understand the steps to improve your code.

pip install cython

Cython is a Python-to-C compiler. It can significantly improve performance for specific tasks, especially those with heavy computation and loops. Also, Cython’s syntax is very similar to Python, which makes it easy to learn.

Let’s consider an example where we calculate the sum of squares for a list of numbers. The code without optimization would look like this:

  • perfTest_1.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### first version of accute computation.            ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

def compute_sum_of_squares(n):
    return sum([i**2 for i in range(n)])

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 1: Execution time: {time.time() - start} seconds")

Here, n_val contains the value as – “1000000000”.

Now, let’s optimize it using Cython by installing the abovementioned packages. Then, you will have to create a .pyx file, say “compute.pyx”, with the following code:

cpdef double compute_sum_of_squares(int n):
    return sum([i**2 for i in range(n)])

Now, create a setup.py file to compile it:

###########################################################
#### Written By: SATYAKI DE                            ####
#### Written On: 31-Jul-2023                           ####
#### Modified On 31-Jul-2023                           ####
####                                                   ####
#### Objective: This is the main calling               ####
#### python script that will create the                ####
#### compiled library after executing the compute.pyx. ####
####                                                   ####
###########################################################

from setuptools import setup
from Cython.Build import cythonize

setup(
    ext_modules = cythonize("compute.pyx")
)

Compile it using the command:

python setup.py build_ext --inplace

This will look like the following –

Finally, you can import the function from the compiled “.pyx” file inside the improved code.

  • perfTest_2.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### optimized & precompiled custom library, which   ####
#### will significantly improve the performance.     ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf
from compute import compute_sum_of_squares

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 2: Execution time with multiprocessing: {time.time() - start} seconds")

By compiling to C, Cython can speed up loop and function calls, leading to significant speedup for CPU-bound tasks.

Please note that while Cython can dramatically improve performance, it can make the code more complex and harder to debug. Therefore, starting with regular Python and switching to Cython for the performance-critical parts of the code is recommended.


So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Enabling OpenAI-based NLP engine with SIRI (MacBook/iPad/iPhone) through a proxy-driven restricted API using Python.

Today, I’m very excited to demonstrate an effortless & new way to integrate SIRI with a controlled Open-AI exposed through a proxy API. So, why this is important; this will give you options to control your ChatGPT environment as per your principles & then you can use a load-balancer (if you want) & exposed that through proxy.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.


Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it fascinating? This approach will lead to a whole new ballgame, where you can add SIRI with an entirely new world of knowledge as per your requirements & expose them in a controlled way.

FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, Apple Shortcuts triggered the requests through its voice app, which then translates the question to text & then it will invoke the ngrok proxy API, which will eventually trigger the controlled custom API built using Flask & Python to start the Open AI API.


CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 27-Jun-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based MAC-shortcuts ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_TOKEN' : 60,
'OUT_DIR': 'data'
}

Some of the important entries from the above snippet are as follows –

        'MODEL_NAME': 'gpt-3.5-turbo',
        'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
        'TEMP_VAL': 0.2,

TEMP_VAL will help you to control the response in a more authentic manner. It varies between 0 to 1.

  • clsJarvis.py (This is the main calling Python script for the input parameters.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### Flask framework to expose the OpenAI ####
#### API with more control & encapsulate the ####
#### server IPs with proxy layers. ####
#### ####
#####################################################
import openai
from flask import request, jsonify
from clsConfigClient import clsConfigClient as cf
import os
import clsTemplate as ct
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
openai.api_key = open_ai_Key
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsJarvis:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.max_token = cf.conf['MAX_TOKEN']
self.temp_val = cf.conf['TEMP_VAL']
def extractContentInText(self, query):
try:
model_name = self.model_name
max_token = self.max_token
temp_val = self.temp_val
template = ct.templateVal_1
response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
inputJson = {"text": response['choices'][0]['message']['content']}
return jsonify(inputJson)
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
template = ct.templateVal_2
inputJson = {"text": template}
return jsonify(inputJson)

view raw

clsJarvis.py

hosted with ❤ by GitHub

The key snippets from the above script are as follows –

def extractContentInText(self, query):
    try:
        model_name = self.model_name
        max_token = self.max_token
        temp_val = self.temp_val

        template = ct.templateVal_1

        response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
        inputJson = {"text": response['choices'][0]['message']['content']}

        return jsonify(inputJson)
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)
        template = ct.templateVal_2

        inputJson = {"text": template}

        return jsonify(inputJson)

The provided Python code snippet defines a method extractContentInText, which interacts with OpenAI’s API to generate a response from OpenAI’s chat model to a user’s query. Here’s a summary of what it does:

  1. It fetches some predefined model configurations (model_name, max_token, temp_val). These are class attributes defined elsewhere.
  2. It sets a system message template (initial instruction for the AI model) using ct.templateVal_1. The ct object isn’t defined within this snippet but is likely another predefined object or module in the more extensive program.
  3. It then calls openai.ChatCompletion.create() to send messages to the AI model and generate a response. The statements include an initial system message and a user’s query.
  4. The model’s response is extracted and formatted into a JSON object inputJson where the ‘text’ field holds the AI’s response.
  5. The input JSON object returns a JSON response.

If an error occurs at any stage of this process (caught in the except block), it prints the error, sets a fallback message template using ct.templateVal_2, formats this into a JSON object, and returns it as a JSON response.

Note: The max_token variable is fetched but not used within the function; it might be a remnant of previous code or meant to be used in further development. The code also assumes a predefined ct object and a method called jsonify(), possibly from Flask, for formatting Python dictionaries into JSON format.

  • testJarvis.py (This is the main calling Python script.)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### shortcut application created inside MAC ####
#### enviornment including MacBook, IPad or IPhone. ####
#### ####
#########################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import clsJarvis as jv
import datetime
from flask import Flask, request, jsonify
app = Flask(__name__)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
cJarvis = jv.clsJarvis()
######################################
#### Global Flag ########
######################################
@app.route('/openai', methods=['POST'])
def openai_call():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
data = request.get_json()
print('Data::')
print(data)
prompt = data.get('prompt', '')
print('Prompt::')
print(prompt)
res = cJarvis.extractContentInText(str(prompt))
return res
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
app.run(host='0.0.0.0')

view raw

testJarvis.py

hosted with ❤ by GitHub

Please find the key snippets –

@app.route('/openai', methods=['POST'])
def openai_call():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        data = request.get_json()
        print('Data::')
        print(data)
        prompt = data.get('prompt', '')

        print('Prompt::')
        print(prompt)

        res = cJarvis.extractContentInText(str(prompt))

        return res

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

The provided Python code defines a route in a Flask web server that listens for POST requests at the ‘/openai’ endpoint. Here’s what it does in detail:

  1. It records and prints the current time, marking the start of the request handling.
  2. It retrieves the incoming data from the POST request as JSON with the request.get_json().
  3. It then extracts the ‘prompt’ from the JSON data. The request defaults to an empty string if no ‘prompt’ is provided in the request.
  4. The prompt is passed as an argument to the method extractContentInText() object cJarvis. This method is expected to use OpenAI’s API to generate a response from a model given the prompt (as discussed in your previous question). The result of this method call is stored in the variable res.
  5. The res variable (the model’s response) returns the answer to the client requesting the POST.
  6. It prints the current time again, marking the end of the request handling (However, this part of the code will never be executed as it places after a return statement).
  7. If an error occurs during this process, it catches the exception, converts it to a string, and prints the error message.

The cJarvis object used in the cJarvis.extractContentInText(str(prompt)) call is not defined within this code snippet. It is a global object likely defined elsewhere in the more extensive program. The extractContentInText method is the one you shared in your previous question.

Apple Shortcuts:

Now, let us understand the steps in Apple Shortcuts.

You can now set up a Siri Shortcut to call the URL provided by ngrok:

  1. Open the Shortcuts app on your iPhone.
  2. Tap the ‘+’ to create a new Shortcut.
  3. Add an action, search for “URL,” and select the URL action. Enter your ngrok URL here, with the /openai endpoint.
  4. Add another action, search for “Get Contents of URL.” This step will send a POST request to the URL from the previous activity. Set the method to POST and add a request body with type ‘JSON,’ containing a key ‘prompt’ and a value being the input you want to send to your OpenAI model.
  5. Optionally, you can add another action, “Show Result” or “Speak Text” to see/hear the result returned from your server.
  6. Save your Shortcut and give it a name.

You should now be able to activate Siri and say the name of your Shortcut to have it send a request to your server, which will then send a prompt to the OpenAI API and return the response.

Let us understand the “Get contents of” with easy postman screenshots –

As you can see that the newly exposed proxy-API will receive an input named prompt, which will be passed from “Dictate Text.”


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Enable OpenAI chatbot with the selected YouTube video content using LangChain, FAISS & YouTube data-API.

Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.

How will it help?

Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.


What is LangChain?

LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:

  1. Data-aware: connect a language model to other sources of data
  2. Agentic: allow a language model to interact with its environment

The LangChain framework works around these principles.

To know more about this, please click the following link.

As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.


What is FAISS?

Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.

Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.

To know more about this, please click the following link.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

Here are the steps that will follow in sequence –

  • The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
  • Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
  • Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based video content ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_CNT' : 5,
'OUT_DIR': 'data'
}

Some of the key entries from the above scripts are as follows –

'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-kfrjfijdrkidjkfjd9474nbfjfkfjfhfhf84i84hnfhjdbv6Bgvv",
'YOUTUBE_KEY': "AIjfjfUYGe64hHJ-LOFO5u-mkso9pPOJGFU",
'TEMP_VAL': 0.2,

From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.

  • clsTemplate.py (Contains all the templates for OpenAI.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On: 28-May-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the template for ####
#### OpenAI prompts to get the correct ####
#### response. ####
#### ####
################################################
# Template to use for the system message prompt
templateVal_1 = """
You are a helpful assistant that that can answer questions about youtube videos
based on the video's transcript: {docs}
Only use the factual information from the transcript to answer the question.
If you feel like you don't have enough information to answer the question, say "I don't know".
Your answers should be verbose and detailed.
"""

view raw

clsTemplate.py

hosted with ❤ by GitHub

The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.

  • clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### LangChain of package to extract ####
#### the transcript from the YouTube videos & ####
#### then answer the questions based on the ####
#### topics selected by the users. ####
#### ####
#####################################################
from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
from googleapiclient.discovery import build
import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf
import os
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsVideoContentScrapper:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.temp_val = cf.conf['TEMP_VAL']
self.max_cnt = int(cf.conf['MAX_CNT'])
def createDBFromYoutubeVideoUrl(self, video_url):
try:
loader = YoutubeLoader.from_youtube_url(video_url)
transcript = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.split_documents(transcript)
db = FAISS.from_documents(docs, embeddings)
return db
except Exception as e:
x = str(e)
print('Error: ', x)
return ''
def getResponseFromQuery(self, db, query, k=4):
try:
"""
gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
the number of tokens to analyze.
"""
mod_name = self.model_name
temp_val = self.temp_val
docs = db.similarity_search(query, k=k)
docs_page_content = " ".join([d.page_content for d in docs])
chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)
# Template to use for the system message prompt
template = ct.templateVal_1
system_message_prompt = SystemMessagePromptTemplate.from_template(template)
# Human question prompt
human_template = "Answer the following question: {question}"
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
chat_prompt = ChatPromptTemplate.from_messages(
[system_message_prompt, human_message_prompt]
)
chain = LLMChain(llm=chat, prompt=chat_prompt)
response = chain.run(question=query, docs=docs_page_content)
response = response.replace("\n", "")
return response, docs
except Exception as e:
x = str(e)
print('Error: ', x)
return '', ''
def topFiveURLFromYouTube(self, service, **kwargs):
try:
video_urls = []
channel_list = []
results = service.search().list(**kwargs).execute()
for item in results['items']:
print("Title: ", item['snippet']['title'])
print("Description: ", item['snippet']['description'])
channel = item['snippet']['channelId']
print("Channel Id: ", channel)
# Fetch the channel name using the channel ID
channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
channel_title = channel_response['items'][0]['snippet']['title']
print("Channel Title: ", channel_title)
channel_list.append(channel_title)
print("Video Id: ", item['id']['videoId'])
vidURL = "https://www.youtube.com/watch?v=&quot; + item['id']['videoId']
print("Video URL: " + vidURL)
video_urls.append(vidURL)
print("\n")
return video_urls, channel_list
except Exception as e:
video_urls = []
channel_list = []
x = str(e)
print('Error: ', x)
return video_urls, channel_list
def extractContentInText(self, topic, query):
try:
discussedTopic = []
strKeyText = ''
cnt = 0
max_cnt = self.max_cnt
urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
print('Returned List: ')
print(urlList)
print()
for video_url in urlList:
print('Processing Video: ')
print(video_url)
db = self.createDBFromYoutubeVideoUrl(video_url)
response, docs = self.getResponseFromQuery(db, query)
if len(response) > 0:
strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
discussedTopic.append(strKeyText + response)
cnt += 1
return discussedTopic
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
return discussedTopic

Let us understand the key methods step by step in detail –

def topFiveURLFromYouTube(self, service, **kwargs):
    try:
        video_urls = []
        channel_list = []
        results = service.search().list(**kwargs).execute()

        for item in results['items']:
            print("Title: ", item['snippet']['title'])
            print("Description: ", item['snippet']['description'])
            channel = item['snippet']['channelId']
            print("Channel Id: ", channel)

            # Fetch the channel name using the channel ID
            channel_response = service.channels().list(part='snippet',id=item['snippet']['channelId']).execute()
            channel_title = channel_response['items'][0]['snippet']['title']
            print("Channel Title: ", channel_title)
            channel_list.append(channel_title)

            print("Video Id: ", item['id']['videoId'])
            vidURL = "https://www.youtube.com/watch?v=" + item['id']['videoId']
            print("Video URL: " + vidURL)
            video_urls.append(vidURL)
            print("\n")

        return video_urls, channel_list

    except Exception as e:
        video_urls = []
        channel_list = []
        x = str(e)
        print('Error: ', x)

        return video_urls, channel_list

The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.

def createDBFromYoutubeVideoUrl(self, video_url):
    try:
        loader = YoutubeLoader.from_youtube_url(video_url)
        transcript = loader.load()

        text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        docs = text_splitter.split_documents(transcript)

        db = FAISS.from_documents(docs, embeddings)
        return db

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return ''

The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:

  1. The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
  2. The function uses a try-except block to handle any potential exceptions or errors that may occur.
  3. Inside the try block, the following steps are going to perform:
  • First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
  • The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
  • It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
  • The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
  • The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
  • Finally, the db variable is returned, representing the created database from the video transcript.

4. If an exception occurs during the execution of the try block, the code execution moves to the except block:

  • Here, it first converts the exception e to a string x.
  • Then it prints an error message.
  • Finally, it returns an empty string as an indication of the error.

def getResponseFromQuery(self, db, query, k=4):
      try:
          """
          gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizes
          the number of tokens to analyze.
          """

          mod_name = self.model_name
          temp_val = self.temp_val

          docs = db.similarity_search(query, k=k)
          docs_page_content = " ".join([d.page_content for d in docs])

          chat = ChatOpenAI(model_name=mod_name, temperature=temp_val)

          # Template to use for the system message prompt
          template = ct.templateVal_1

          system_message_prompt = SystemMessagePromptTemplate.from_template(template)

          # Human question prompt
          human_template = "Answer the following question: {question}"
          human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)

          chat_prompt = ChatPromptTemplate.from_messages(
              [system_message_prompt, human_message_prompt]
          )

          chain = LLMChain(llm=chat, prompt=chat_prompt)

          response = chain.run(question=query, docs=docs_page_content)
          response = response.replace("\n", "")
          return response, docs

      except Exception as e:
          x = str(e)
          print('Error: ', x)

          return '', ''

The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:

  1. The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
  2. The function initiates a try-except block for handling any errors that might occur.
  3. Inside the try block:
  • The function retrieves the model name and temperature value from the instance of the class this function is a part of.
  • The function then searches the db database for documents similar to the query and saves these in docs.
  • It concatenates the content of the returned documents into a single string docs_page_content.
  • It creates a ChatOpenAI object with the model name and temperature value.
  • It creates a system message prompt from a predefined template.
  • It creates a human message prompt, which is the query.
  • It combines these two prompts to form a chat prompt.
  • An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
  • This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
  • Finally, the function returns this response along with the original documents.
  1. If any error occurs during these operations, the function goes to the except block where:
  • The error message is printed.
  • The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.

def extractContentInText(self, topic, query):
    try:
        discussedTopic = []
        strKeyText = ''
        cnt = 0
        max_cnt = self.max_cnt

        urlList, channelList = self.topFiveURLFromYouTube(youtube, q=topic, part='id,snippet',maxResults=max_cnt,type='video')
        print('Returned List: ')
        print(urlList)
        print()

        for video_url in urlList:
            print('Processing Video: ')
            print(video_url)
            db = self.createDBFromYoutubeVideoUrl(video_url)

            response, docs = self.getResponseFromQuery(db, query)

            if len(response) > 0:
                strKeyText = 'As per the topic discussed in ' + channelList[cnt] + ', '
                discussedTopic.append(strKeyText + response)

            cnt += 1

        return discussedTopic
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)

        return discussedTopic

This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:

  1. The function extractContentInText is defined with topic and query as parameters.
  2. It begins with a try-except block to catch and handle any possible exceptions.
  3. In the try block:
  • It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
  • It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
  • It prints the returned list of URLs.
  • Then, it starts a loop over each URL in the urlList.
    • For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
    • It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
    • If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
    • It increments the counter cnt by one after each iteration.
    • Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
  1. If any error occurs during these operations, the function goes into the except block:
  • It first resets discussedTopic to an empty list.
  • Then it converts the exception e to a string and prints the error message.
  • Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
  • testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-May-2023 ####
#### Modified On 28-May-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsVideoContentScrapper class to extract ####
#### the transcript from the YouTube videos. ####
#### ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import textwrap
import clsVideoContentScrapper as cvsc
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
cVCScrapper = cvsc.clsVideoContentScrapper()
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
#query = "What are they saying about Microsoft?"
print('Please share your topic!')
inputTopic = input('User: ')
print('Please ask your questions?')
inputQry = input('User: ')
print()
retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
cnt = 0
for discussedTopic in retList:
finText = str(cnt + 1) + ') ' + discussedTopic
print()
print(textwrap.fill(finText, width=150))
cnt += 1
r1 = len(retList)
if r1 > 0:
print()
print('Successfully Scrapped!')
else:
print()
print('Failed to Scrappe!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Please find the key snippet –

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        #query = "What are they saying about Microsoft?"
        print('Please share your topic!')
        inputTopic = input('User: ')
        print('Please ask your questions?')
        inputQry = input('User: ')
        print()

        retList = cVCScrapper.extractContentInText(inputTopic, inputQry)
        cnt = 0

        for discussedTopic in retList:
            finText = str(cnt + 1) + ') ' + discussedTopic
            print()
            print(textwrap.fill(finText, width=150))

            cnt += 1

        r1 = len(retList)

        if r1 > 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == "__main__":
    main()

The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.

USAGE & COST FACTOR:

Please find the OpenAI usage –

Please find the YouTube API usage –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! ðŸ™‚

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.

Demonstration of GPT-3 model tuning using Python for an upcoming PyPi-package

Today, I’m very excited to demonstrate an effortless & new way to fine-tune the GPT-3 model using Python with the help of my new build (unpublished) PyPi package. In this post, I plan to deal with the custom website link as a response from this website depending upon the user queries with the help of the OpenAI-based tuned model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it exciting? Finally, we can efficiently handle your custom website URL using OpenAI tuned model.


What is ChatGPT?

ChatGPT is an advanced artificial intelligence language model developed by OpenAI based on the GPT-4 architecture. As an AI model, it is designed to understand and generate human-like text-based on the input it receives. ChatGPT can engage in various tasks, such as answering questions, providing recommendations, creating content, and simulating conversation. While it is highly advanced and versatile, it’s important to note that ChatGPT’s knowledge is limited to the data it was trained on, with a cutoff date of September 2021.

When to tune GPT model?

Tuning a GPT or any AI model might be necessary for various reasons. Here are some common scenarios when you should consider adjusting or fine-tuning a GPT model:

  1. Domain-specific knowledge: If you need your model to have a deeper understanding of a specific domain or industry, you can fine-tune it with domain-specific data to improve its performance.
  2. New or updated data: If new or updated information is not part of the original training data, you should fine-tune the model to ensure it has the most accurate and up-to-date knowledge.
  3. Customization: If you require the model to have a specific style, tone, or focus, you can fine-tune it with data that reflects those characteristics.
  4. Ethical or safety considerations: To make the model safer and more aligned with human values, you should fine-tune it to reduce biased or harmful outputs.
  5. Improve performance: If the base model’s performance is unsatisfactory for a particular task or application, you can fine-tune it on a dataset more relevant to the job, often leading to better results.

Remember that tuning or fine-tuning a GPT model requires access to appropriate data and computational resources and an understanding of the model’s architecture and training techniques. Additionally, monitoring and evaluating the model’s performance after fine-tuning is essential to ensure that the desired improvements have been achieved.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

The initial Python-based client interacts with the tuned OpenAI models. This process enables it to get a precise response with custom data in a very convenient way. So that anyone can understand.


SOURCE DATA:

Let us understand how to feed the source data as it will deal with your website URL link.

The first data that we are going to talk about is the one that contains the hyperlink. Let us explore the sample here.

From the above diagram, one can easily understand that the application will interpret a unique hash number associated with a specific URL. This data will be used to look up the URL after the OpenAI response from the tuned model as a result of any user query.

Now, let us understand the actual source data.

If we closely check, we’ll see the source file contains two columns – prompt & completion. And the website reference is put inside the curly braces as shown – “{Hash Code that represents your URL}.”

During the response, the newly created library replaces the hash value with the correct URL after the successful lookup & presents the complete answer.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 21-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### OpenAI fine-tune projects. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'ChatGPT Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'TEMP_FILE_NAME': 'chatGPTData.jsonl',
'TITLE': "GPT-3 Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/&#39;,
'EPOCH': 10,
'SUFFIX': 'py-saty',
'EXIT_KEYWORD': 'bye'
}

Some of the important entries that will require later are as follows –

'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/',
'EXIT_KEYWORD': 'bye'

We’ll discuss these entries later.

  • trainChatGPTModel.py (This is the main calling Python script that will invoke the newly created fine-tune GPT-3 enabler.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune GPT-3 enabler. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
#tModel = tm.clsTrainModel()
tModel = tm.clsTrainModel3()
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
#clog.logr(OutPutFileName, debug_ind, df, subdir)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Following are the key snippet from the above script –

data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']

And, then –

tModel = tm.clsTrainModel3()
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)

As one can see, the package needs only the source data file to fine-tune GPT-3 model.

  • checkFineTuneChatGPTModelStat.py (This is the main Python script that will check the status of the tuned process that will happen inside the OpenAI-cloud environment.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####