Building a real-time Gen AI Improvement Matrices (GAIIM) using Python, UpTrain, Open AI & React

How does the RAG work better for various enterprise-level Gen AI use cases? What needs to be there to make the LLM model work more efficiently & able to check the response & validate their response, including the bias, hallucination & many more?

This is my post (after a slight GAP), which will capture and discuss some of the burning issues that many AI architects are trying to explore. In this post, I’ve considered a newly formed AI start-up from India, which developed an open-source framework that can easily evaluate all the challenges that one is facing with their LLMs & easily integrate with your existing models for better understanding including its limitations. You will get plenty of insights about it.

But, before we dig deep, why not see the demo first –

Isn’t it exciting? Let’s deep dive into the flow of events.


Let’s explore the broad-level architecture/flow –

Let us understand the steps of the above architecture. First, our Python application needs to trigger and enable the API, which will interact with the Open AI and UpTrain AI to fetch all the LLM KPIs based on the input from the React app named “Evaluation.”

Once the response is received from UpTrain AI, the Python application then organizes the results in a better readable manner without changing the core details coming out from their APIs & then shares that back with the react interface.

Let’s examine the react app’s sample inputs to better understand the input that will be passed to the Python-based API solution, which is wrapper capability to call multiple APIs from the UpTrain & then accumulate them under one response by parsing the data & reorganizing the data with the help of Open AI & sharing that back.

Highlighted in RED are some of the critical inputs you need to provide to get most of the KPIs. And, here are the sample text inputs for your reference –

Q. Enter input question.
A. What are the four largest moons of Jupiter?
Q. Enter the context document.
A. Jupiter, the largest planet in our solar system, boasts a fascinating array of moons. Among these, the four largest are collectively known as the Galilean moons, named after the renowned astronomer Galileo Galilei, who first observed them in 1610. These four moons, Io, Europa, Ganymede, and Callisto, hold significant scientific interest due to their unique characteristics and diverse geological features.
Q. Enter LLM response.
A. The four largest moons of Jupiter, known as the Galilean moons, are Io, Europa, Ganymede, and Marshmello.
Q. Enter the persona response.
A. strict and methodical teacher
Q. Enter the guideline.
A. Response shouldn’t contain any specific numbers
Q. Enter the ground truth.
A. The Jupiter is the largest & gaseous planet in the solar system.
Q. Choose the evaluation method.
A. llm

Once you fill in the App should look like this –

Once you fill in, the app should look like the below screenshot –


Let us understand the sample packages that are required for this task.

pip install Flask==3.0.3
pip install Flask-Cors==4.0.0
pip install numpy==1.26.4
pip install openai==1.17.0
pip install pandas==2.2.2
pip install uptrain==0.6.13

Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

def askFeluda(context, question):
    try:
        # Combine the context and the question into a single prompt.
        prompt_text = f"{context}\n\n Question: {question}\n Answer:"

        # Retrieve conversation history from the session or database
        conversation_history = []

        # Add the new message to the conversation history
        conversation_history.append(prompt_text)

        # Call OpenAI API with the updated conversation
        response = client.with_options(max_retries=0).chat.completions.create(
            messages=[
                {
                    "role": "user",
                    "content": prompt_text,
                }
            ],
            model=cf.conf['MODEL_NAME'],
            max_tokens=150,  # You can adjust this based on how long you expect the response to be
            temperature=0.3,  # Adjust for creativity. Lower values make responses more focused and deterministic
            top_p=1,
            frequency_penalty=0,
            presence_penalty=0
        )

        # Extract the content from the first choice's message
        chat_response = response.choices[0].message.content

        # Print the generated response text
        return chat_response.strip()
    except Exception as e:
        return f"An error occurred: {str(e)}"

This function will ask the supplied questions with contexts or it will supply the UpTrain results to summarize the JSON into more easily readable plain texts. For our test, we’ve used “gpt-3.5-turbo”.

def evalContextRelevance(question, context, resFeluda, personaResponse):
    try:
        data = [{
            'question': question,
            'context': context,
            'response': resFeluda
        }]

        results = eval_llm.evaluate(
            data=data,
            checks=[Evals.CONTEXT_RELEVANCE, Evals.FACTUAL_ACCURACY, Evals.RESPONSE_COMPLETENESS, Evals.RESPONSE_RELEVANCE, CritiqueTone(llm_persona=personaResponse), Evals.CRITIQUE_LANGUAGE, Evals.VALID_RESPONSE, Evals.RESPONSE_CONCISENESS]
        )

        return results
    except Exception as e:
        x = str(e)

        return x

The above methods initiate the model from UpTrain to get all the stats, which will be helpful for your LLM response. In this post, we’ve captured the following KPIs –

- Context Relevance Explanation
- Factual Accuracy Explanation
- Guideline Adherence Explanation
- Response Completeness Explanation
- Response Fluency Explanation
- Response Relevance Explanation
- Response Tonality Explanation
# Function to extract and print all the keys and their values
def extractPrintedData(data):
    for entry in data:
        print("Parsed Data:")
        for key, value in entry.items():


            if key == 'score_context_relevance':
                s_1_key_val = value
            elif key == 'explanation_context_relevance':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_1_val = cleaned_value
            elif key == 'score_factual_accuracy':
                s_2_key_val = value
            elif key == 'explanation_factual_accuracy':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_2_val = cleaned_value
            elif key == 'score_response_completeness':
                s_3_key_val = value
            elif key == 'explanation_response_completeness':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_3_val = cleaned_value
            elif key == 'score_response_relevance':
                s_4_key_val = value
            elif key == 'explanation_response_relevance':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_4_val = cleaned_value
            elif key == 'score_critique_tone':
                s_5_key_val = value
            elif key == 'explanation_critique_tone':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_5_val = cleaned_value
            elif key == 'score_fluency':
                s_6_key_val = value
            elif key == 'explanation_fluency':
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_6_val = cleaned_value
            elif key == 'score_valid_response':
                s_7_key_val = value
            elif key == 'score_response_conciseness':
                s_8_key_val = value
            elif key == 'explanation_response_conciseness':
                print('Raw Value: ', value)
                cleaned_value = preprocessParseData(value)
                print(f"{key}: {cleaned_value}\n")
                s_8_val = cleaned_value

    print('$'*200)

    results = {
        "Factual_Accuracy_Score": s_2_key_val,
        "Factual_Accuracy_Explanation": s_2_val,
        "Context_Relevance_Score": s_1_key_val,
        "Context_Relevance_Explanation": s_1_val,
        "Response_Completeness_Score": s_3_key_val,
        "Response_Completeness_Explanation": s_3_val,
        "Response_Relevance_Score": s_4_key_val,
        "Response_Relevance_Explanation": s_4_val,
        "Response_Fluency_Score": s_6_key_val,
        "Response_Fluency_Explanation": s_6_val,
        "Response_Tonality_Score": s_5_key_val,
        "Response_Tonality_Explanation": s_5_val,
        "Guideline_Adherence_Score": s_8_key_val,
        "Guideline_Adherence_Explanation": s_8_val,
        "Response_Match_Score": s_7_key_val
        # Add other evaluations similarly
    }

    return results

The above method parsed the initial data from UpTrain before sending it to OpenAI for a better summary without changing any text returned by it.

@app.route('/evaluate', methods=['POST'])
def evaluate():
    data = request.json

    if not data:
        return {jsonify({'error': 'No data provided'}), 400}

    # Extracting input data for processing (just an example of logging received data)
    question = data.get('question', '')
    context = data.get('context', '')
    llmResponse = ''
    personaResponse = data.get('personaResponse', '')
    guideline = data.get('guideline', '')
    groundTruth = data.get('groundTruth', '')
    evaluationMethod = data.get('evaluationMethod', '')

    print('question:')
    print(question)

    llmResponse = askFeluda(context, question)
    print('='*200)
    print('Response from Feluda::')
    print(llmResponse)
    print('='*200)

    # Getting Context LLM
    cLLM = evalContextRelevance(question, context, llmResponse, personaResponse)

    print('&'*200)
    print('cLLM:')
    print(cLLM)
    print(type(cLLM))
    print('&'*200)

    results = extractPrintedData(cLLM)

    print('JSON::')
    print(results)

    resJson = jsonify(results)

    return resJson

The above function is the main method, which first receives all the input parameters from the react app & then invokes one-by-one functions to get the LLM response, and LLM performance & finally summarizes them before sending it to react-app.

For any other scripts, please refer to the above-mentioned GitHub link.


Let us see some of the screenshots of the test run –


So, we’ve done it.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Enabling & Exploring Stable Defussion – Part 1

This new solution will evaluate the power of Stable Defussion, which is created solutions as we progress & refine our prompt from scratch by using Stable Defussion & Python. This post opens new opportunities for IT companies & business start-ups looking to deliver solutions & have better performance compared to the paid version of Stable Defussion AI’s API performance. This project is for the advanced Python, Stable Defussion for data Science Newbies & AI evangelists.

In a series of posts, I’ll explain and focus on the Stable Defussion API and custom solution using the Python-based SDK of Stable Defussion.

But, before that, let us view the video that it generates from the prompt by using the third-party API:

Prompt to Video

And, let us understand the prompt that we supplied to create the above video –

Isn’t it exciting?

However, I want to stress this point: the video generated by the Stable Defusion (Stability AI) API was able to partially apply the animation effect. Even though the animation applies to the cloud, It doesn’t apply the animation to the wave. But, I must admit, the quality of the video is quite good.


Let us understand the code and how we run the solution, and then we can try to understand its performance along with the other solutions later in the subsequent series.

As you know, we’re exploring the code base of the third-party API, which will actually execute a series of API calls that create a video out of the prompt.

Let us understand some of the important snippet –

class clsStabilityAIAPI:
    def __init__(self, STABLE_DIFF_API_KEY, OUT_DIR_PATH, FILE_NM, VID_FILE_NM):
        self.STABLE_DIFF_API_KEY = STABLE_DIFF_API_KEY
        self.OUT_DIR_PATH = OUT_DIR_PATH
        self.FILE_NM = FILE_NM
        self.VID_FILE_NM = VID_FILE_NM

    def delFile(self, fileName):
        try:
            # Deleting the intermediate image
            os.remove(fileName)

            return 0 
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

    def generateText2Image(self, inputDescription):
        try:
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
            fullFileName = self.OUT_DIR_PATH + self.FILE_NM
            
            if STABLE_DIFF_API_KEY is None:
                raise Exception("Missing Stability API key.")
            
            response = requests.post(f"{api_host}/v1/generation/{engine_id}/text-to-image",
                                    headers={
                                        "Content-Type": "application/json",
                                        "Accept": "application/json",
                                        "Authorization": f"Bearer {STABLE_DIFF_API_KEY}"
                                        },
                                        json={
                                            "text_prompts": [{"text": inputDescription}],
                                            "cfg_scale": 7,
                                            "height": 1024,
                                            "width": 576,
                                            "samples": 1,
                                            "steps": 30,
                                            },)
            
            if response.status_code != 200:
                raise Exception("Non-200 response: " + str(response.text))
            
            data = response.json()

            for i, image in enumerate(data["artifacts"]):
                with open(fullFileName, "wb") as f:
                    f.write(base64.b64decode(image["base64"]))      
            
            return fullFileName

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 'N/A'

    def image2VideoPassOne(self, imgNameWithPath):
        try:
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY

            response = requests.post(f"https://api.stability.ai/v2beta/image-to-video",
                                    headers={"authorization": f"Bearer {STABLE_DIFF_API_KEY}"},
                                    files={"image": open(imgNameWithPath, "rb")},
                                    data={"seed": 0,"cfg_scale": 1.8,"motion_bucket_id": 127},
                                    )
            
            print('First Pass Response:')
            print(str(response.text))
            
            genID = response.json().get('id')

            return genID 
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 'N/A'

    def image2VideoPassTwo(self, genId):
        try:
            generation_id = genId
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
            fullVideoFileName = self.OUT_DIR_PATH + self.VID_FILE_NM

            response = requests.request("GET", f"https://api.stability.ai/v2beta/image-to-video/result/{generation_id}",
                                        headers={
                                            'accept': "video/*",  # Use 'application/json' to receive base64 encoded JSON
                                            'authorization': f"Bearer {STABLE_DIFF_API_KEY}"
                                            },) 
            
            print('Retrieve Status Code: ', str(response.status_code))
            
            if response.status_code == 202:
                print("Generation in-progress, try again in 10 seconds.")

                return 5
            elif response.status_code == 200:
                print("Generation complete!")
                with open(fullVideoFileName, 'wb') as file:
                    file.write(response.content)

                print("Successfully Retrieved the video file!")

                return 0
            else:
                raise Exception(str(response.json()))
            
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

Now, let us understand the code –

This function is called when an object of the class is created. It initializes four properties:

  • STABLE_DIFF_API_KEY: the API key for Stability AI services.
  • OUT_DIR_PATH: the folder path to save files.
  • FILE_NM: the name of the generated image file.
  • VID_FILE_NM: the name of the generated video file.

This function deletes a file specified by fileName.

  • If successful, it returns 0.
  • If an error occurs, it logs the error and returns 1.

This function generates an image based on a text description:

  • Sends a request to the Stability AI text-to-image endpoint using the API key.
  • Saves the resulting image to a file.
  • Returns the file’s path on success or 'N/A' if an error occurs.

This function uploads an image to create a video in its first phase:

  • Sends the image to Stability AI’s image-to-video endpoint.
  • Logs the response and extracts the id (generation ID) for the next phase.
  • Returns the id if successful or 'N/A' on failure.

This function retrieves the video created in the second phase using the genId:

  • Checks the video generation status from the Stability AI endpoint.
  • If complete, saves the video file and returns 0.
  • If still processing, returns 5.
  • Logs and returns 1 for any errors.

As you can see, the code is pretty simple to understand & we’ve taken all the necessary actions in case of any unforeseen network issues or even if the video is not ready after our job submission in the following lines of the main calling script (generateText2VideoAPI.py) –

waitTime = 10
time.sleep(waitTime)

# Failed case retry
retries = 1
success = False

try:
    while not success:
        try:
            z = r1.image2VideoPassTwo(gID)
        except Exception as e:
            success = False

        if z == 0:
            success = True
        else:
            wait = retries * 2 * 15
            str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"

            print(str_R1)

            time.sleep(wait)
            retries += 1

        # Checking maximum retries
        if retries >= maxRetryNo:
            success = True
            raise  Exception
except:
    print()

And, let us see how the run looks like –

Let us understand the CPU utilization –

As you can see, CPU utilization is minimal since most tasks are at the API end.


So, we’ve done it. 🙂

Please find the next series on this topic below:

Enabling & Exploring Stable Defussion – Part 2

Enabling & Exploring Stable Defussion – Part 3

Please let me know your feedback after reviewing all the posts! 🙂

Building solutions using LLM AutoGen in Python – Part 1

Today, I’ll be publishing a series of posts on LLM agents and how they can help you improve your delivery capabilities for various tasks.

Also, we’re providing the demo here –

Isn’t it exciting?


The application will interact with the AutoGen agents, use underlying Open AI APIs to follow the instructions, generate the steps, and then follow that path to generate the desired code. Finally, it will execute the generated scripts if the first outcome of the demo satisfies users.


Let us understand some of the key snippets –

# Create the assistant agent
assistant = autogen.AssistantAgent(
    name="AI_Assistant",
    llm_config={
        "config_list": config_list,
    }
)

Purpose: This line creates an AI assistant agent named “AI_Assistant”.

Function: It uses a language model configuration provided in config_list to define how the assistant behaves.

Role: The assistant serves as the primary agent who will coordinate with other agents to solve problems.

user_proxy = autogen.UserProxyAgent(
    name="Admin",
    system_message=templateVal_1,
    human_input_mode="TERMINATE",
    max_consecutive_auto_reply=10,
    is_termination_msg=lambda x: x.get("content", "").rstrip().endswith("TERMINATE"),
    code_execution_config={
        "work_dir": WORK_DIR,
        "use_docker": False,
    },
)

Purpose: This code creates a user proxy agent named “Admin”.

Function:

  • System Message: Uses templateVal_1 as its initial message to set the context.
  • Human Input Mode: Set to "TERMINATE", meaning it will keep interacting until a termination condition is met.
  • Auto-Reply Limit: Can automatically reply up to 10 times without human intervention.
  • Termination Condition: A message is considered a termination message if it ends with the word “TERMINATE”.
  • Code Execution: Configured to execute code in the directory specified by WORK_DIR without using Docker.

Role: Acts as an intermediary between the user and the assistant, handling interactions and managing the conversation flow.

engineer = autogen.AssistantAgent(
    name="Engineer",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_2,
)

Purpose: Creates an assistant agent named “Engineer”.

Function: Uses templateVal_2 as its system message to define its expertise in engineering matters.

Role: Specializes in technical and engineering aspects of the problem.

game_designer = autogen.AssistantAgent(
    name="GameDesigner",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_3,
)

Purpose: Creates an assistant agent named “GameDesigner”.

Function: Uses templateVal_3 to set its focus on game design.

Role: Provides insights and solutions related to game design aspects.

planner = autogen.AssistantAgent(
    name="Planer",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_4,
)

Purpose: Creates an assistant agent named “Planer” (likely intended to be “Planner”).

Function: Uses templateVal_4 to define its role in planning.

Role: Responsible for organizing and planning tasks to solve the problem.

critic = autogen.AssistantAgent(
    name="Critic",
    llm_config={
        "config_list": config_list,
    },
    system_message=templateVal_5,
)

Purpose: Creates an assistant agent named “Critic”.

Function: Uses templateVal_5 to set its function as a critic.

Role: Provide feedback, critique solutions, and help improve the overall response.

logging.basicConfig(level=logging.ERROR)
logger = logging.getLogger(__name__)

Purpose: Configures the logging system.

Function: Sets the logging level to only capture error messages to avoid cluttering the output.

Role: Helps in debugging by capturing and displaying error messages.

def buildAndPlay(self, inputPrompt):
    try:
        user_proxy.initiate_chat(
            assistant,
            message=f"We need to solve the following problem: {inputPrompt}. "
                    "Please coordinate with the admin, engineer, game_designer, planner and critic to provide a comprehensive solution. "
        )

        return 0
    except Exception as e:
        x = str(e)
        print('Error: <<Real-time Translation>>: ', x)

        return 1

Purpose: Defines a method to initiate the problem-solving process.

Function:

  • Parameters: Takes inputPrompt, which is the problem to be solved.
  • Action:
    • Calls user_proxy.initiate_chat() to start a conversation between the user proxy agent and the assistant agent.
    • Sends a message requesting coordination among all agents to provide a comprehensive solution to the problem.
  • Error Handling: If an exception occurs, it prints an error message and returns 1.

Role: Initiates collaboration among all agents to solve the provided problem.

Agents Setup: Multiple agents with specialized roles are created.
Initiating Conversation: The buildAndPlay method starts a conversation, asking agents to collaborate.
Problem Solving: Agents communicate and coordinate to provide a comprehensive solution to the input problem.
Error Handling: The system captures and logs any errors that occur during execution.


We’ll continue to discuss this topic in the upcoming post.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Building a real-time streamlit app by consuming events from Ably channels

I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.

One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.

However, I would like to share the run before we dig deep into this.


Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.

Let’s explore the broad-level architecture/flow –

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.

Let us understand the sample packages that are required for this task.

pip install ably==2.0.3
pip install numpy==1.26.3
pip install pandas==2.2.0
pip install plotly==5.19.0
pip install requests==2.31.0
pip install streamlit==1.30.0
pip install streamlit-autorefresh==1.0.1
pip install streamlit-echarts==0.4.0

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.

1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)

Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

def createHumidityGauge(humidity_value):
    fig = go.Figure(go.Indicator(
        mode = "gauge+number",
        value = humidity_value,
        domain = {'x': [0, 1], 'y': [0, 1]},
        title = {'text': "Humidity", 'font': {'size': 24}},
        gauge = {
            'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
            'bar': {'color': "darkblue"},
            'bgcolor': "white",
            'borderwidth': 2,
            'bordercolor': "gray",
            'steps': [
                {'range': [0, 50], 'color': 'cyan'},
                {'range': [50, 100], 'color': 'royalblue'}],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': humidity_value}
        }
    ))

    fig.update_layout(height=220, paper_bgcolor = "white", font = {'color': "darkblue", 'family': "Arial"}, margin=dict(t=0, l=5, r=5, b=0))

    return fig

The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.

This code defines a function createHumidityGauge that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:

  1. Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
  2. Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
  3. Setting Gauge Properties:
    • The value is set to the humidity_value parameter, so the gauge shows this humidity level.
    • The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
    • The title is set to “Humidity” with a font size of 24, labeling the gauge.
    • The gauge section defines the appearance and behavior of the gauge, including:
      • An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
      • The color and style of the gauge’s bar and background.
      • Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
      • A threshold line that appears at the value of the humidity, marked in red to stand out.
  4. Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
  5. Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.

Other similar functions will repeat the same steps.

def createTemperatureLineChart(data):
    # Assuming 'data' is a DataFrame with a 'Timestamp' index and a 'Temperature' column
    fig = px.line(data, x=data.index, y='Temperature', title='Temperature Vs Time')
    fig.update_layout(height=270)  # Specify the desired height here
    return fig

The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.

This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:

  1. Function Definition: It starts with defining a function named createTemperatureLineChart that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
  2. Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
    • The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
    • The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
    • The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
  3. Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
  4. Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.

Similar functions will repeat for other KPIs.

    st.sidebar.header("KPIs")
    selected_kpis = st.sidebar.multiselect(
        "Select KPIs", options=["Temperature", "Humidity", "Pressure"], default=["Temperature"]
    )

The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).

# Split the layout into columns for KPIs and graphs
    gauge_col, kpi_col, graph_col = st.columns(3)

    # Auto-refresh setup
    st_autorefresh(interval=7000, key='data_refresh')

    # Fetching real-time data
    data = getData(var1, DInd)

    st.markdown(
        """
        <style>
        .stEcharts { margin-bottom: -50px; }  /* Class might differ, inspect the HTML to find the correct class name */
        </style>
        """,
        unsafe_allow_html=True
    )

    # Display gauges at the top of the page
    gauges = st.container()

    with gauges:
        col1, col2, col3 = st.columns(3)
        with col1:
            humidity_value = round(data['Humidity'].iloc[-1], 2)
            humidity_gauge_fig = createHumidityGauge(humidity_value)
            st.plotly_chart(humidity_gauge_fig, use_container_width=True)

        with col2:
            temp_value = round(data['Temperature'].iloc[-1], 2)
            temp_gauge_fig = createTempGauge(temp_value)
            st.plotly_chart(temp_gauge_fig, use_container_width=True)

        with col3:
            pressure_value = round(data['Pressure'].iloc[-1], 2)
            pressure_gauge_fig = createPressureGauge(pressure_value)
            st.plotly_chart(pressure_gauge_fig, use_container_width=True)


    # Next row for actual readings and charts side-by-side
    readings_charts = st.container()


    # Display KPIs and their trends
    with readings_charts:
        readings_col, graph_col = st.columns([1, 2])

        with readings_col:
            st.subheader("Latest Readings")
            if "Temperature" in selected_kpis:
                st.metric("Temperature", f"{temp_value:.2f}%")

            if "Humidity" in selected_kpis:
                st.metric("Humidity", f"{humidity_value:.2f}%")

            if "Pressure" in selected_kpis:
                st.metric("Pressure", f"{pressure_value:.2f}%")


        # Graph placeholders for each KPI
        with graph_col:
            if "Temperature" in selected_kpis:
                temperature_fig = createTemperatureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(temperature_fig, use_container_width=True)

            if "Humidity" in selected_kpis:
                humidity_fig = createHumidityLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(humidity_fig, use_container_width=True)

            if "Pressure" in selected_kpis:
                pressure_fig = createPressureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(pressure_fig, use_container_width=True)
  1. The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
  2. It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
  3. Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
  4. A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
  5. A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
  6. Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
  7. Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
  8. The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
  9. In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
  10. This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.

Let us understand some of the important screenshots of this application –


So, we’ve done it.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Text2SQL Data Extractor (T2SDE) using Python & Open AI LLM

Today, I will share a new post that will contextualize the source files & then read the data into the pandas data frame, and then dynamically create the SQL & execute it. Then, fetch the data from the sources based on the query generated dynamically. This project is for the advanced Python developer and data Science Newbie.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.


The application will take the metadata captured from source data dynamically. It blends the metadata and enhances the prompt to pass to the Flask server. The Flask server has all the limits of contexts.

Once the application receives the correct generated SQL, it will then apply the SQL using the SQLAlchemy package to get the desired results.

The following are the important packages that are essential to this project –

pip install openai==1.6.1
pip install pandas==2.1.4
pip install Flask==3.0.0
pip install SQLAlchemy==2.0.23

We’ll have both the server and the main application. Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • 1_invokeSQLServer.py (This is the main calling Python script to invoke the OpenAI-Server.)

Please find some of the key snippet from this discussion –

@app.route('/message', methods=['POST'])
def message():
    input_text = request.json.get('input_text', None)
    session_id = request.json.get('session_id', None)

    print('*' * 240)
    print('User Input:')
    print(str(input_text))
    print('*' * 240)

    # Retrieve conversation history from the session or database
    conversation_history = session.get(session_id, [])

    # Add the new message to the conversation history
    conversation_history.append(input_text)

    # Call OpenAI API with the updated conversation
    response = client.with_options(max_retries=0).chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": input_text,
            }
        ],
        model=cf.conf['MODEL_NAME'],
    )

    # Extract the content from the first choice's message
    chat_response = response.choices[0].message.content
    print('*' * 240)
    print('Resposne::')
    print(chat_response)
    print('*' * 240)

    conversation_history.append(chat_response)

    # Store the updated conversation history in the session or database
    session[session_id] = conversation_history

    return chat_response

This code defines a web application route that handles POST requests sent to the /message endpoint:

  1. Route Declaration: The @app.route('/message', methods=['POST']) part specifies that the function message() is executed when the server receives a POST request at the /message URL.
  2. Function Definition: Inside the message() function:
    • It retrieves two pieces of data from the request’s JSON body: input_text (the user’s input message) and session_id (a unique identifier for the user’s session).
    • It prints the user’s input message, surrounded by lines of asterisks for emphasis.
  3. Conversation History Management:
    • The code retrieves the conversation history associated with the given session_id. This history is a list of messages.
    • It then adds the new user message (input_text) to this conversation history.
  4. OpenAI API Call:
    • The function makes a call to the OpenAI API, passing the user’s message. It specifies not to retry the request if it fails (max_retries=0).
    • The model used for the OpenAI API call is taken from some configurations (cf.conf['MODEL_NAME']).
  5. Processing API Response:
    • The response from the OpenAI API is processed to extract the content of the chat response.
    • This chat response is printed.
  6. Updating Conversation History:
    • The chat response is added to the conversation history.
    • The updated conversation history is then stored back in the session or database, associated with the session_id.
  7. Returning the Response: Finally, the function returns the chat response.

  • clsDynamicSQLProcess.py (This Python class generates the SQL & then executes the flask server to invoke the OpenAI-Server.)

Now, let us understand the few important piece of snippet –

def text2SQLBegin(self, DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd='N'):

        question = srcQueryPrompt
        create_table_statement = ''
        jStr = ''

        print('DBFileNameList::', DBFileNameList)
        print('prevSessionDBFileNameList::', self.prevSessionDBFileNameList)

        if set(self.prevSessionDBFileNameList) == set(DBFileNameList):
            self.flag = 'Y'
        else:
            self.flag = 'N'

        if self.flag == 'N':

            for i in DBFileNameList:
                DBFileName = i

                FullDBname = fileDBPath + DBFileName
                print('File: ', str(FullDBname))

                tabName, _ = DBFileName.split('.')

                # Reading the source data
                df = pd.read_csv(FullDBname)

                # Convert all string columns to lowercase
                df = df.apply(lambda x: x.str.lower() if x.dtype == "object" else x)

                # Convert DataFrame to SQL table
                df.to_sql(tabName, con=engine, index=False)

                # Create a MetaData object and reflect the existing database
                metadata = MetaData()
                metadata.reflect(bind=engine)

                # Access the 'users' table from the reflected metadata
                table = metadata.tables[tabName]

                # Generate the CREATE TABLE statement
                create_table_statement = create_table_statement + str(CreateTable(table)) + '; \n'

                tabName = ''

            for joinS in joinCond:
                jStr = jStr + joinS + '\n'

            self.prevSessionDBFileNameList = DBFileNameList
            self.prev_create_table_statement = create_table_statement

            masterSessionDBFileNameList = self.prevSessionDBFileNameList
            mast_create_table_statement = self.prev_create_table_statement

        else:
            masterSessionDBFileNameList = self.prevSessionDBFileNameList
            mast_create_table_statement = self.prev_create_table_statement

        inputPrompt = (templateVal_1 + mast_create_table_statement + jStr + templateVal_2).format(question=question)

        if debugInd == 'Y':
            print('INPUT PROMPT::')
            print(inputPrompt)

        print('*' * 240)
        print('Find the Generated SQL:')
        print()

        DBFileNameList = []
        create_table_statement = ''

        return inputPrompt
  1. Function Overview: The text2SQLBegin function processes a list of database file names (DBFileNameList), a file path (fileDBPath), a query prompt (srcQueryPrompt), join conditions (joinCond), and a debug indicator (debugInd) to generate SQL commands.
  2. Initial Setup: It starts by initializing variables for the question, the SQL table creation statement, and a string for join conditions.
  3. Debug Prints: The function prints the current and previous session database file names for debugging purposes.
  4. Flag Setting: A flag is set to ‘Y’ if the current session’s database file names match the previous session’s; otherwise, it’s set to ‘N’.
  5. Processing New Session Data: If the flag is ‘N’, indicating new session data:
    • For each database file, it reads the data, converts string columns to lowercase, and creates a corresponding SQL table in a database using the pandas library.
    • Metadata is generated for each table and a CREATE TABLE SQL statement is created.
  6. Join Conditions and Statement Aggregation: Join conditions are concatenated, and previous session information is updated with the current session’s data.
  7. Handling Repeated Sessions: If the session data is repeated (flag is ‘Y’), it uses the previous session’s SQL table creation statements and database file names.
  8. Final Input Prompt Creation: It constructs the final input prompt by combining template values with the create table statement, join conditions, and the original question.
  9. Debug Printing: If debug mode is enabled, it prints the final input prompt.
  10. Conclusion: The function clears the DBFileNameList and create_table_statement variables, and returns the constructed input prompt.
  def text2SQLEnd(self, srcContext, debugInd='N'):
      url = self.url

      payload = json.dumps({"input_text": srcContext,"session_id": ""})
      headers = {'Content-Type': 'application/json', 'Cookie': cf.conf['HEADER_TOKEN']}

      response = requests.request("POST", url, headers=headers, data=payload)

      return response.text

The text2SQLEnd function sends an HTTP POST request to a specified URL and returns the response. It takes two parameters: srcContext which contains the input text, and an optional debugInd for debugging purposes. The function constructs the request payload by converting the input text and an empty session ID to JSON format. It sets the request headers, including a content type of ‘application/json’ and a token from the configuration file. The function then sends the POST request using the requests library and returns the text content of the response.

  def sql2Data(self, srcSQL):
      # Executing the query on top of your data
      resultSQL = pd.read_sql_query(srcSQL, con=engine)

      return resultSQL

The sql2Data function is designed to execute a SQL query on a database and return the result. It takes a single parameter, srcSQL, which contains the SQL query to be executed. The function uses the pandas library to run the provided SQL query (srcSQL) against a database connection (engine). It then returns the result of this query, which is typically a DataFrame object containing the data retrieved from the database.

def genData(self, srcQueryPrompt, fileDBPath, DBFileNameList, joinCond, debugInd='N'):
    try:
        authorName = self.authorName
        website = self.website
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

        print('*' * 240)
        print('SQL Start Time: ' + str(var))
        print('*' * 240)

        print('*' * 240)
        print()

        if debugInd == 'Y':
            print('Author Name: ', authorName)
            print('For more information, please visit the following Website: ', website)
            print()

            print('*' * 240)
        print('Your Data for Retrieval:')
        print('*' * 240)

        if debugInd == 'Y':

            print()
            print('Converted File to Dataframe Sample:')
            print()

        else:
            print()

        context = self.text2SQLBegin(DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd)
        srcSQL = self.text2SQLEnd(context, debugInd)

        print(srcSQL)
        print('*' * 240)
        print()
        resDF = self.sql2Data(srcSQL)

        print('*' * 240)
        print('SQL End Time: ' + str(var))
        print('*' * 240)

        return resDF

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df
  1. Initialization and Debug Information: The function begins by initializing variables like authorName, website, and a timestamp (var). It then prints the start time of the SQL process. If the debug indicator (debugInd) is ‘Y’, it prints additional information like the author’s name and website.
  2. Generating SQL Context and Query: The function calls text2SQLBegin with various parameters (file paths, database file names, query prompt, join conditions, and the debug indicator) to generate an SQL context. Then it calls text2SQLEnd with this context and the debug indicator to generate the actual SQL query.
  3. Executing the SQL Query: It prints the generated SQL query for visibility, especially in debug mode. The query is then executed by calling sql2Data, which returns the result as a data frame (resDF).
  4. Finalization and Error Handling: After executing the query, it prints the SQL end time. In case of any exceptions during the process, it catches the error, prints it, and returns an empty DataFrame.
  5. Return Value: The function returns the DataFrame (resDF) containing the results of the executed SQL query. If an error occurs, it returns an empty DataFrame instead.

Let us explore the directory structure starting from the parent to some of the important child folder should look like this –

Let us understand the important screenshots of this entire process –


So, finally, we’ve done it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 2)

Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.

In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.

Let us understand the flow of events here –

  1. The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
  2. The application will clean the nested data & extract the relevant attributes after flattening the JSON.
  3. It will create the unstructured text-based context, which is later fed to the Vector DB framework.

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.


We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.

We’ll discuss the scripts in the diagram as part of the flow mentioned above.

  • clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
def genData(self):
    try:
        base_url = self.base_url
        header_token = self.header_token
        basePath = self.basePath
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        subdir = self.subdir
        Ind = self.Ind
        var_1 = datetime.now().strftime("%H.%M.%S")


        devVal = list()
        objVal = list()

        # Main Details
        headers = {'Cookie':header_token}
        payload={}

        url = base_url + '/departments'

        date_ranges = self.generateFirstDayOfLastTenYears()

        # Getting all the departments
        try:
            print('Department URL:')
            print(str(url))

            response = requests.request("GET", url, headers=headers, data=payload)
            parsed_data = json.loads(response.text)

            print('Department JSON:')
            print(str(parsed_data))

            # Extract the "departmentId" values into a Python list
            for dept_det in parsed_data['departments']:
                for info in dept_det:
                    if info == 'departmentId':
                        devVal.append(dept_det[info])

        except Exception as e:
            x = str(e)
            print('Error: ', x)
            devVal = list()

        # List to hold thread objects
        threads = []

        # Calling the Data using threads
        for dep in devVal:
            t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
            threads.append(t)
            t.start()

        # Wait for all threads to complete
        for t in threads:
            t.join()

        res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)

        if res == 0:
            print('Successful!')
        else:
            print('Failure!')

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above code translates into the following steps –

  1. The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
  2. Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
  3. Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
def generateFirstDayOfLastTenYears(self):
    yearRange = self.yearRange
    date_format = "%Y-%m-%d"
    current_year = datetime.now().year

    date_ranges = []
    for year in range(current_year - yearRange, current_year + 1):
        first_day_of_year_full = datetime(year, 1, 1)
        first_day_of_year = first_day_of_year_full.strftime(date_format)
        date_ranges.append(first_day_of_year)

    return date_ranges

The first method will generate the first day of each year for the last ten years, including the current year.

def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
    try:
        cnt = 0
        cnt_x = 1
        var_1 = datetime.now().strftime("%H.%M.%S")

        for x_start_date in date_ranges:
            try:
                urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)

                print('Nested URL:')
                print(str(urlM))

                response_obj = requests.request("GET", urlM, headers=headers, data=payload)
                objectDets = json.loads(response_obj.text)

                for obj_det in objectDets['objectIDs']:
                    objVal.append(obj_det)

                for objId in objVal:
                    urlS = base_url + '/objects/' + str(objId)

                    print('Final URL:')
                    print(str(urlS))

                    response_det = requests.request("GET", urlS, headers=headers, data=payload)
                    objDetJSON = response_det.text

                    retDB = self.createData(objDetJSON)
                    retDB['departmentId'] = str(dep)

                    if cnt == 0:
                        df_M = retDB
                    else:
                        d_frames = [df_M, retDB]
                        df_M = pd.concat(d_frames)

                    if cnt == 1000:
                        cnt = 0
                        clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
                        cnt_x += 1
                        df_M = pd.DataFrame()

                    cnt += 1

            except Exception as e:
                x = str(e)
                print('Error X:', x)
        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above method will invoke the individual API call to fetch the relevant artifact information.

def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
    try:
        csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
        data_frames = []

        for file in csv_files:
            encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
            for encoding in encodings_to_try:
                try:
                    FullFileName = directory_path + file
                    print('File Name: ', FullFileName)
                    df = pd.read_csv(FullFileName, encoding=encoding)
                    data_frames.append(df)
                    break  # Stop trying other encodings if the reading is successful
                except UnicodeDecodeError:
                    continue

        if not data_frames:
            raise Exception("Unable to read CSV files. Check encoding or file format.")

        merged_df = pd.concat(data_frames, ignore_index=True)

        merged_full_name = os.path.join(output_path, output_file)
        merged_df.to_csv(merged_full_name, index=False)

        for file in csv_files:
            os.remove(os.path.join(directory_path, file))

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return 1

The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).

For the complete code, please visit the GitHub.

  • 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################
import datetime
from clsConfigClient import clsConfigClient as cf

import clsExtractJSON as cej

########################################################
################    Global Area   ######################
########################################################

cJSON = cej.clsExtractJSON()

basePath = cf.conf['DATA_PATH']
outputPath = cf.conf['OUTPUT_PATH']
mergedFile = cf.conf['MERGED_FILE']

########################################################
################  End Of Global Area   #################
########################################################

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        r1 = cJSON.genData()

        if r1 == 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script calls the main class after instantiating the class.

  • clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
def createRec(self):
    try:
        basePath = self.basePath
        fileName = self.fileName
        Ind = self.Ind
        subdir = self.subdir
        base_url = self.base_url
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        cleanedFile = self.cleanedFile

        FullFileName = outputPath + mergedFile

        df = pd.read_csv(FullFileName)
        df2 = df[listCol]
        dfFin = df2.drop_duplicates().reset_index(drop=True)

        dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
        dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
        dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])

        # Dropping the old Dtype Columns
        dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
        dfFin.drop(['objectURL'], axis=1, inplace=True)
        dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['AAT_URL'], axis=1, inplace=True)
        dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['URL'], axis=1, inplace=True)

        # Save the filtered DataFrame to a new CSV file
        #clog.logr(cleanedFile, Ind, dfFin, subdir)
        res = self.addHash(dfFin)

        if res == 0:
            print('Added Hash!')
        else:
            print('Failed to add hash!')

        # Generate the text for each row in the dataframe
        for _, row in dfFin.iterrows():
            x = self.genPrompt(row)
            self.addDocument(x, cleanedFile)

        return documents

    except Exception as e:
        x = str(e)
        print('Record Error: ', x)

        return documents

The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.

Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.

Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.

For more details, please visit the GitHub link.

  • 1_1_testCreateRec.py (This is the main class that will call the above class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsCreateList as ccl

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

documents = []

###############################################
###    End of Global Section                ###
###############################################
def main():
    try:
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        print('*'*240)
        print('Creating Index store:: ')
        print('*'*240)

        documents = cl.createRec()

        print('Inserted Sample Records: ')
        print(str(documents))
        print('\n')

        r1 = len(documents)

        if r1 > 0:
            print()
            print('Successfully Indexed sample records!')
        else:
            print()
            print('Failed to sample Indexed recrods!')

        print('*'*120)
        var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.

This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.

– Satyaki
  • clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Sep-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### haystack frameowrk to contextulioze the docs    ####
#### inside the vector DB.                           ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai
import pandas as pd
import os
import clsCreateList as ccl

from clsConfigClient import clsConfigClient as cf
import clsL as log

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

Ind = cf.conf['DEBUG_IND']
openAIKey = cf.conf['OPEN_AI_KEY']

os.environ["TOKENIZERS_PARALLELISM"] = "false"

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

# Encode your data to create embeddings
documents = []

var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var_1))
print('*'*120)

print('*'*240)
print('Creating Index store:: ')
print('*'*240)

documents = cl.createRec()

print('Inserted Sample Records: ')
print(documents[:5])
print('\n')
print('Type:')
print(type(documents))

r1 = len(documents)

if r1 > 0:
    print()
    print('Successfully Indexed records!')
else:
    print()
    print('Failed to Indexed recrods!')

print('*'*120)
var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var_2))

# Passing OpenAI API Key
openai.api_key = openAIKey

###############################################
###    End of Global Section                ###
###############################################

class clsFeedVectorDB:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.modelFileName = cf.conf['CACHE_FILE']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
        self.queryModel = cf.conf['QUERY_MODEL']
        self.passageModel = cf.conf['PASSAGE_MODEL']

    def retrieveDocuments(self, question, retriever, top_k=3):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrievedDocs, question):
        documents_text = " ".join([doc.content for doc in retrievedDocs])
        prompt = f"Given the following documents: {documents_text}, answer the question: {question}"

        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=prompt,
            max_tokens=150
        )
        return response.choices[0].text.strip()

    def ragAnswerWithHaystackAndGPT3(self, question, retriever):
        retrievedDocs = self.retrieveDocuments(question, retriever)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def genData(self, strVal):
        try:
            basePath = self.basePath
            modelFileName = self.modelFileName
            vectorDBPath = self.vectorDBPath
            vectorDBFileName = self.vectorDBFileName
            queryModel = self.queryModel
            passageModel = self.passageModel

            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            FullFileName = basePath + modelFileName
            FullVectorDBname = vectorDBPath + vectorDBFileName

            sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
            print('Vector DB Path: ', str(sqlite_path))

            indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
            indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

            print('File: ', str(indexFile))
            print('Config: ', str(indexConfig))

            # Initialize DocumentStore
            document_store = FAISSDocumentStore(sql_url=sqlite_path)

            libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'

            document_store.write_documents(documents)

            # Initialize Retriever
            retriever = DensePassageRetriever(document_store=document_store,
                                              query_embedding_model=queryModel,
                                              passage_embedding_model=passageModel,
                                              use_gpu=False)

            document_store.update_embeddings(retriever=retriever)

            document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")

            print('*'*120)
            print('Testing with RAG & OpenAI...')
            print('*'*120)

            answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)

            print('*'*120)
            print('Testing Answer:: ')
            print(answer)
            print('*'*120)

            return 0

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In the above script, the following essential steps took place –

  1. First, the application calls the clsCreateList class to store all the documents inside a dictionary.
  2. Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
  3. Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.

Here is a short clip of how the RAG models contextualize with the source data.

RAG-Model Contextualization

So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Hacking the performance of Python Solutions with a custom-built library

Today, I’m very excited to demonstrate an effortless & new way to hack the performance of Python. This post will be a super short & yet crisp presentation of improving the overall performance.

Why not view the demo before going through it?


Demo

Isn’t it exciting? Let’s understand the steps to improve your code.

pip install cython

Cython is a Python-to-C compiler. It can significantly improve performance for specific tasks, especially those with heavy computation and loops. Also, Cython’s syntax is very similar to Python, which makes it easy to learn.

Let’s consider an example where we calculate the sum of squares for a list of numbers. The code without optimization would look like this:

  • perfTest_1.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### first version of accute computation.            ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

def compute_sum_of_squares(n):
    return sum([i**2 for i in range(n)])

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 1: Execution time: {time.time() - start} seconds")

Here, n_val contains the value as – “1000000000”.

Now, let’s optimize it using Cython by installing the abovementioned packages. Then, you will have to create a .pyx file, say “compute.pyx”, with the following code:

cpdef double compute_sum_of_squares(int n):
    return sum([i**2 for i in range(n)])

Now, create a setup.py file to compile it:

###########################################################
#### Written By: SATYAKI DE                            ####
#### Written On: 31-Jul-2023                           ####
#### Modified On 31-Jul-2023                           ####
####                                                   ####
#### Objective: This is the main calling               ####
#### python script that will create the                ####
#### compiled library after executing the compute.pyx. ####
####                                                   ####
###########################################################

from setuptools import setup
from Cython.Build import cythonize

setup(
    ext_modules = cythonize("compute.pyx")
)

Compile it using the command:

python setup.py build_ext --inplace

This will look like the following –

Finally, you can import the function from the compiled “.pyx” file inside the improved code.

  • perfTest_2.py (First untuned Python class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 31-Jul-2023                         ####
#### Modified On 31-Jul-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### optimized & precompiled custom library, which   ####
#### will significantly improve the performance.     ####
####                                                 ####
#########################################################
from clsConfigClient import clsConfigClient as cf
from compute import compute_sum_of_squares

import time
start = time.time()

n_val = cf.conf['INPUT_VAL']

n = n_val

print(compute_sum_of_squares(n))

print(f"Test - 2: Execution time with multiprocessing: {time.time() - start} seconds")

By compiling to C, Cython can speed up loop and function calls, leading to significant speedup for CPU-bound tasks.

Please note that while Cython can dramatically improve performance, it can make the code more complex and harder to debug. Therefore, starting with regular Python and switching to Cython for the performance-critical parts of the code is recommended.


So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Enabling OpenAI-based NLP engine with SIRI (MacBook/iPad/iPhone) through a proxy-driven restricted API using Python.

Today, I’m very excited to demonstrate an effortless & new way to integrate SIRI with a controlled Open-AI exposed through a proxy API. So, why this is important; this will give you options to control your ChatGPT environment as per your principles & then you can use a load-balancer (if you want) & exposed that through proxy.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.


Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it fascinating? This approach will lead to a whole new ballgame, where you can add SIRI with an entirely new world of knowledge as per your requirements & expose them in a controlled way.

FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, Apple Shortcuts triggered the requests through its voice app, which then translates the question to text & then it will invoke the ngrok proxy API, which will eventually trigger the controlled custom API built using Flask & Python to start the Open AI API.


CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 27-Jun-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### personal OpenAI-based MAC-shortcuts ####
#### enable bot. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'MODEL_PATH': Curr_Path + sep + 'model' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'LangChain Demo!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': 'Output.csv',
'MODEL_NAME': 'gpt-3.5-turbo',
'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
'TITLE': "LangChain Demo!",
'TEMP_VAL': 0.2,
'PATH' : Curr_Path,
'MAX_TOKEN' : 60,
'OUT_DIR': 'data'
}

Some of the important entries from the above snippet are as follows –

        'MODEL_NAME': 'gpt-3.5-turbo',
        'OPEN_AI_KEY': "sk-Jdhfdyruru9383474HHFJFJFJO6jrlxPKbv6Bgvv",
        'TEMP_VAL': 0.2,

TEMP_VAL will help you to control the response in a more authentic manner. It varies between 0 to 1.

  • clsJarvis.py (This is the main calling Python script for the input parameters.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python class that will invoke the ####
#### Flask framework to expose the OpenAI ####
#### API with more control & encapsulate the ####
#### server IPs with proxy layers. ####
#### ####
#####################################################
import openai
from flask import request, jsonify
from clsConfigClient import clsConfigClient as cf
import os
import clsTemplate as ct
###############################################
### Global Section ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
openai.api_key = open_ai_Key
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### End of Global Section ###
###############################################
class clsJarvis:
def __init__(self):
self.model_name = cf.conf['MODEL_NAME']
self.max_token = cf.conf['MAX_TOKEN']
self.temp_val = cf.conf['TEMP_VAL']
def extractContentInText(self, query):
try:
model_name = self.model_name
max_token = self.max_token
temp_val = self.temp_val
template = ct.templateVal_1
response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
inputJson = {"text": response['choices'][0]['message']['content']}
return jsonify(inputJson)
except Exception as e:
discussedTopic = []
x = str(e)
print('Error: ', x)
template = ct.templateVal_2
inputJson = {"text": template}
return jsonify(inputJson)

view raw

clsJarvis.py

hosted with ❤ by GitHub

The key snippets from the above script are as follows –

def extractContentInText(self, query):
    try:
        model_name = self.model_name
        max_token = self.max_token
        temp_val = self.temp_val

        template = ct.templateVal_1

        response = openai.ChatCompletion.create(model=model_name, temperature=temp_val, messages=[{"role": "system", "content": template},{"role": "user", "content": query}])
        inputJson = {"text": response['choices'][0]['message']['content']}

        return jsonify(inputJson)
    except Exception as e:
        discussedTopic = []
        x = str(e)
        print('Error: ', x)
        template = ct.templateVal_2

        inputJson = {"text": template}

        return jsonify(inputJson)

The provided Python code snippet defines a method extractContentInText, which interacts with OpenAI’s API to generate a response from OpenAI’s chat model to a user’s query. Here’s a summary of what it does:

  1. It fetches some predefined model configurations (model_name, max_token, temp_val). These are class attributes defined elsewhere.
  2. It sets a system message template (initial instruction for the AI model) using ct.templateVal_1. The ct object isn’t defined within this snippet but is likely another predefined object or module in the more extensive program.
  3. It then calls openai.ChatCompletion.create() to send messages to the AI model and generate a response. The statements include an initial system message and a user’s query.
  4. The model’s response is extracted and formatted into a JSON object inputJson where the ‘text’ field holds the AI’s response.
  5. The input JSON object returns a JSON response.

If an error occurs at any stage of this process (caught in the except block), it prints the error, sets a fallback message template using ct.templateVal_2, formats this into a JSON object, and returns it as a JSON response.

Note: The max_token variable is fetched but not used within the function; it might be a remnant of previous code or meant to be used in further development. The code also assumes a predefined ct object and a method called jsonify(), possibly from Flask, for formatting Python dictionaries into JSON format.

  • testJarvis.py (This is the main calling Python script.)


#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### shortcut application created inside MAC ####
#### enviornment including MacBook, IPad or IPhone. ####
#### ####
#########################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import clsJarvis as jv
import datetime
from flask import Flask, request, jsonify
app = Flask(__name__)
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
cJarvis = jv.clsJarvis()
######################################
#### Global Flag ########
######################################
@app.route('/openai', methods=['POST'])
def openai_call():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
data = request.get_json()
print('Data::')
print(data)
prompt = data.get('prompt', '')
print('Prompt::')
print(prompt)
res = cJarvis.extractContentInText(str(prompt))
return res
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
app.run(host='0.0.0.0')

view raw

testJarvis.py

hosted with ❤ by GitHub

Please find the key snippets –

@app.route('/openai', methods=['POST'])
def openai_call():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        data = request.get_json()
        print('Data::')
        print(data)
        prompt = data.get('prompt', '')

        print('Prompt::')
        print(prompt)

        res = cJarvis.extractContentInText(str(prompt))

        return res

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

The provided Python code defines a route in a Flask web server that listens for POST requests at the ‘/openai’ endpoint. Here’s what it does in detail:

  1. It records and prints the current time, marking the start of the request handling.
  2. It retrieves the incoming data from the POST request as JSON with the request.get_json().
  3. It then extracts the ‘prompt’ from the JSON data. The request defaults to an empty string if no ‘prompt’ is provided in the request.
  4. The prompt is passed as an argument to the method extractContentInText() object cJarvis. This method is expected to use OpenAI’s API to generate a response from a model given the prompt (as discussed in your previous question). The result of this method call is stored in the variable res.
  5. The res variable (the model’s response) returns the answer to the client requesting the POST.
  6. It prints the current time again, marking the end of the request handling (However, this part of the code will never be executed as it places after a return statement).
  7. If an error occurs during this process, it catches the exception, converts it to a string, and prints the error message.

The cJarvis object used in the cJarvis.extractContentInText(str(prompt)) call is not defined within this code snippet. It is a global object likely defined elsewhere in the more extensive program. The extractContentInText method is the one you shared in your previous question.

Apple Shortcuts:

Now, let us understand the steps in Apple Shortcuts.

You can now set up a Siri Shortcut to call the URL provided by ngrok:

  1. Open the Shortcuts app on your iPhone.
  2. Tap the ‘+’ to create a new Shortcut.
  3. Add an action, search for “URL,” and select the URL action. Enter your ngrok URL here, with the /openai endpoint.
  4. Add another action, search for “Get Contents of URL.” This step will send a POST request to the URL from the previous activity. Set the method to POST and add a request body with type ‘JSON,’ containing a key ‘prompt’ and a value being the input you want to send to your OpenAI model.
  5. Optionally, you can add another action, “Show Result” or “Speak Text” to see/hear the result returned from your server.
  6. Save your Shortcut and give it a name.

You should now be able to activate Siri and say the name of your Shortcut to have it send a request to your server, which will then send a prompt to the OpenAI API and return the response.

Let us understand the “Get contents of” with easy postman screenshots –

As you can see that the newly exposed proxy-API will receive an input named prompt, which will be passed from “Dictate Text.”


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.

Demonstration of GPT-3 model tuning using Python for an upcoming PyPi-package

Today, I’m very excited to demonstrate an effortless & new way to fine-tune the GPT-3 model using Python with the help of my new build (unpublished) PyPi package. In this post, I plan to deal with the custom website link as a response from this website depending upon the user queries with the help of the OpenAI-based tuned model.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Isn’t it exciting? Finally, we can efficiently handle your custom website URL using OpenAI tuned model.


What is ChatGPT?

ChatGPT is an advanced artificial intelligence language model developed by OpenAI based on the GPT-4 architecture. As an AI model, it is designed to understand and generate human-like text-based on the input it receives. ChatGPT can engage in various tasks, such as answering questions, providing recommendations, creating content, and simulating conversation. While it is highly advanced and versatile, it’s important to note that ChatGPT’s knowledge is limited to the data it was trained on, with a cutoff date of September 2021.

When to tune GPT model?

Tuning a GPT or any AI model might be necessary for various reasons. Here are some common scenarios when you should consider adjusting or fine-tuning a GPT model:

  1. Domain-specific knowledge: If you need your model to have a deeper understanding of a specific domain or industry, you can fine-tune it with domain-specific data to improve its performance.
  2. New or updated data: If new or updated information is not part of the original training data, you should fine-tune the model to ensure it has the most accurate and up-to-date knowledge.
  3. Customization: If you require the model to have a specific style, tone, or focus, you can fine-tune it with data that reflects those characteristics.
  4. Ethical or safety considerations: To make the model safer and more aligned with human values, you should fine-tune it to reduce biased or harmful outputs.
  5. Improve performance: If the base model’s performance is unsatisfactory for a particular task or application, you can fine-tune it on a dataset more relevant to the job, often leading to better results.

Remember that tuning or fine-tuning a GPT model requires access to appropriate data and computational resources and an understanding of the model’s architecture and training techniques. Additionally, monitoring and evaluating the model’s performance after fine-tuning is essential to ensure that the desired improvements have been achieved.


FLOW OF EVENTS:

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

The initial Python-based client interacts with the tuned OpenAI models. This process enables it to get a precise response with custom data in a very convenient way. So that anyone can understand.


SOURCE DATA:

Let us understand how to feed the source data as it will deal with your website URL link.

The first data that we are going to talk about is the one that contains the hyperlink. Let us explore the sample here.

From the above diagram, one can easily understand that the application will interpret a unique hash number associated with a specific URL. This data will be used to look up the URL after the OpenAI response from the tuned model as a result of any user query.

Now, let us understand the actual source data.

If we closely check, we’ll see the source file contains two columns – prompt & completion. And the website reference is put inside the curly braces as shown – “{Hash Code that represents your URL}.”

During the response, the newly created library replaces the hash value with the correct URL after the successful lookup & presents the complete answer.

CODE:

Why don’t we go through the code made accessible due to this new library for this particular use case?

  • clsConfigClient.py (This is the main calling Python script for the input parameters.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 21-Feb-2023 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### OpenAI fine-tune projects. ####
#### ####
################################################
import os
import platform as pl
class clsConfigClient(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'DATA_PATH': Curr_Path + sep + 'data' + sep,
'TEMP_PATH': Curr_Path + sep + 'temp' + sep,
'MODEL_DIR': 'model',
'APP_DESC_1': 'ChatGPT Training!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'TEMP_FILE_NAME': 'chatGPTData.jsonl',
'TITLE': "GPT-3 Training!",
'PATH' : Curr_Path,
'OUT_DIR': 'data',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/&#39;,
'EPOCH': 10,
'SUFFIX': 'py-saty',
'EXIT_KEYWORD': 'bye'
}

Some of the important entries that will require later are as follows –

'FILE_NAME': '2023-4-14-WP.csv',
'LKP_FILE_NAME': 'HyperDetails.csv',
'OPEN_API_KEY': 'sk-hdhrujfrkfjfjfjfhjfjfisososT&jsdgL6KIxx',
'MODEL_CD':'davinci',
'URL': 'https://api.openai.com/v1/fine-tunes/',
'EXIT_KEYWORD': 'bye'

We’ll discuss these entries later.

  • trainChatGPTModel.py (This is the main calling Python script that will invoke the newly created fine-tune GPT-3 enabler.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune GPT-3 enabler. ####
#### ####
#####################################################
import pandas as p
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTrainModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
#tModel = tm.clsTrainModel()
tModel = tm.clsTrainModel3()
# Initiating Logging Instances
clog = cl.clsL()
data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']
######################################
#### Global Flag ########
######################################
######################################
### Wrapper functions to invoke ###
### the desired class from newly ###
### built class. ###
######################################
######################################
### End of wrapper functions. ###
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)
if r1 == 0:
print('Successfully Trained!')
else:
print('Failed to Train!')
#clog.logr(OutPutFileName, debug_ind, df, subdir)
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Following are the key snippet from the above script –

data_path = cf.conf['DATA_PATH']
data_file_name = cf.conf['FILE_NAME']

And, then –

tModel = tm.clsTrainModel3()
FullFileName = data_path + data_file_name
r1 = tModel.trainModel(FullFileName)

As one can see, the package needs only the source data file to fine-tune GPT-3 model.

  • checkFineTuneChatGPTModelStat.py (This is the main Python script that will check the status of the tuned process that will happen inside the OpenAI-cloud environment.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 16-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created fine-tune job status inside ####
#### the OpenAI environment. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
# Example usage
input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)
r1 = tmodel.checkStat(url, open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

To check the status of the fine-tuned job inside the OpenAI environment, one needs to provide the fine tune id, which generally starts with -> “ft-*.” One would get this value after the train script’s successful run.

Some of the other key snippets are –

tmodel = tm.clsTestModel3()

url_part = cf.conf['URL']
open_api_key = cf.conf['OPEN_API_KEY']

And, then –

input_text = str(input("Please provide the fine tune Id (Start with ft-*): "))
url = url_part + input_text
print('URL: ', url)

r1 = tmodel.checkStat(url, open_api_key)

The above snippet is self-explanatory as one is passing the fine tune id along with the OpenAI API key.

  • testChatGPTModel.py (This is the main testing Python script that will invoke the newly created fine-tune GPT-3 enabler to get a response with custom data.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 19-Apr-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created class that will test the ####
#### tuned model output. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import pandas as p
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)
if r1 == 0:
print('Successfully tested the tuned GPT-3 model.')
else:
print('Failed to test the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key entries from the above snippet are as follows –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']
lkpDataPath = cf.conf['DATA_PATH']
lkpFileName = cf.conf['LKP_FILE_NAME']

And, then –

LookUpFileName = lkpDataPath + lkpFileName
r1 = tmodel.testModel(LookUpFileName, open_api_key)

In the above lines, the application gets the correct URL value from the look file we’ve prepared for this specific use case.

  • deleteChatGPTModel.py (This is the main Python script that will delete the old intended tuned model, which is no longer needed.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 12-Feb-2023 ####
#### Modified On 21-Feb-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### newly created delete model methods for ####
#### OpenAI. ####
#####################################################
import clsL as cl
from clsConfigClient import clsConfigClient as cf
import datetime
import clsTestModel3 as tm
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
######################################
### Get your global values ####
######################################
debug_ind = 'Y'
# Initiating Logging Instances
clog = cl.clsL()
tmodel = tm.clsTestModel3()
open_api_key = cf.conf['OPEN_API_KEY']
######################################
#### Global Flag ########
######################################
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' * 120)
print('Start Time: ' + str(var))
print('*' * 120)
r1 = tmodel.delOldModel(open_api_key)
if r1 == 0:
print('Successfully checked the status of tuned GPT-3 model.')
else:
print('Failed to check the status of the tuned GPT-3 model.')
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Some of the key snippets from the above scripts are –

tmodel = tm.clsTestModel3()

open_api_key = cf.conf['OPEN_API_KEY']

And, then –

r1 = tmodel.delOldModel(open_api_key)

We’ve demonstrated that using a straightforward method, one can delete any old tuned model from OpenAI that is no longer required.

KEY FEATURES TO CONSIDER DURING TUNING:

  • Data quality: Ensure that the data used for fine-tuning is clean, relevant, and representative of the domain you want the model to understand. Check for biases, inconsistencies, and errors in the dataset.
  • Overfitting: Be cautious of overfitting, which occurs when the model performs exceptionally well on the training data but poorly on unseen data. You can address overfitting by using regularization techniques, early stopping, or cross-validation.
  • Model size and resource requirements: GPT models can be resource-intensive. Be mindful of the hardware limitations and computational resources available when selecting the model size and the time and cost associated with training.
  • Hyperparameter tuning: Select appropriate hyperparameters for your fine-tuning processes, such as learning rate, batch size, and the number of epochs. Experiment with different combinations to achieve the best results without overfitting.
  • Evaluation metrics: Choose suitable evaluation metrics to assess the performance of your fine-tuned model. Consider using multiple metrics to understand your model’s performance comprehensively.
  • Ethical considerations: Be aware of potential biases in your dataset and how the model’s predictions might impact users. Address ethical concerns during the fine-tuning process and consider using techniques such as data augmentation or adversarial training to mitigate these biases.
  • Monitoring and maintenance: Continuously monitor the model’s performance after deployment, and be prepared to re-tune or update it as needed. Regular maintenance ensures that the model remains relevant and accurate.
  • Documentation: Document your tuning process, including the data used, model architecture, hyperparameters, and evaluation metrics. This factor will facilitate easier collaboration, replication, and model maintenance.
  • Cost: OpenAI fine-tuning can be extremely expensive, even for a small volume of data. Hence, organization-wise, one needs to be extremely careful while using this feature.

COST FACTOR:

Before we discuss the actual spending, let us understand the tested data volume to train & tune the model.

So, we’re talking about a total size of 500 KB (at max). And, we did 10 epochs during the training as you can see from the config file mentioned above.

So, it is pretty expensive. Use it wisely.


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.