Creating a local LLM Cluster Server using Apple Silicon GPU

Today, we’re going to discuss creating a local LLM server and then utilizing it to execute various popular LLM models. We will club the local Apple GPUs together via a new framework that binds all the available Apple Silicon devices into one big LLM server. This enables people to run many large models, which was otherwise not possible due to the lack of GPUs.

This is certainly a new way; One can create virtual computation layers by adding nodes to the resource pool, increasing the computation capacity.

Why not witness a small demo to energize ourselves –

Let us understand the scenario. I’ve one Mac Book Pro M4 & 2 Mac Mini Pro M4 (Base models). So, I want to add them & expose them as a cluster as follows –

As you can see, I’ve connected my MacBook Pro with both the Mac Mini using high-speed thunderbolt cables for better data transmissions. And, I’ll be using an open-source framework called “Exo” to create it.

Also, you can see that my total computing capacity is 53.11 TFlops, which is slightly more than the last category.

“Exo” is an open-source framework that helps you merge all your available devices into a large cluster of available resources. This extracts all the computing juice needed to handle complex tasks, including the big LLMs, which require very expensive GPU-based servers.

For more information on “Exo”, please refer to the following link.

In our previous diagram, we can see that the framework also offers endpoints.

  • One option is a local ChatGPT interface, where any question you ask will receive a response from models by combining all available computing power.
  • The other endpoint offers users a choice of any standard LLM API endpoint, which helps them integrate it into their solutions.

Let us see, how the devices are connected together –


To proceed with this, you need to have at least Python 3.12, Anaconda or Miniconda & Xcode installed in all of your machines. Also, you need to install some Apple-specific MLX packages or libraries to get the best performance.

Depending on your choice, you need to use the following link to download Anaconda or Miniconda.

You can download the following link to download the Python 3.12. However, I’ve used Python 3.13 on some machines & some machines, I’ve used Python 3.12. And it worked without any problem.

Sometimes, after installing Anaconda or Miniconda, the environment may not implicitly be activated after successful installation. In that case, you may need to use the following commands in the terminal -> source ~/.bash_profile

To verify, whether the conda has been successfully installed & activated, you need to type the following command –

(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % conda --version
conda 24.11.3
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 

Once you verify it. Now, we need to install the following supplemental packages in all the machines as –

satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
satyaki_de@Satyakis-MacBook-Pro-Max Pandas % conda install anaconda::m4
Channels:
 - defaults
 - anaconda
Platform: osx-arm64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /opt/anaconda3

  added / updated specs:
    - anaconda::m4


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    m4-1.4.18                  |       h1230e6a_1         202 KB  anaconda
    ------------------------------------------------------------
                                           Total:         202 KB

The following NEW packages will be INSTALLED:

  m4                 anaconda/osx-arm64::m4-1.4.18-h1230e6a_1 


Proceed ([y]/n)? y


Downloading and Extracting Packages:
                                                                                                                                                                                                                      
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

Also, you can use this package to install in your machines –

(base) satyakidemini2@Satyakis-Mac-mini-2 exo % 
(base) satyakidemini2@Satyakis-Mac-mini-2 exo % pip install mlx
Collecting mlx
  Downloading mlx-0.23.2-cp312-cp312-macosx_14_0_arm64.whl.metadata (5.3 kB)
Downloading mlx-0.23.2-cp312-cp312-macosx_14_0_arm64.whl (27.6 MB)
   ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 27.6/27.6 MB 8.8 MB/s eta 0:00:00
Installing collected packages: mlx
Successfully installed mlx-0.23.2
(base) satyakidemini2@Satyakis-Mac-mini-2 exo % 
(base) satyakidemini2@Satyakis-Mac-mini-2 exo % 

Till now, we’ve installed all the important packages. Now, we need to setup the final “eco” framework in all the machines like our previous steps.

Now, we’ll first clone the “eco” framework by the following commands –

(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % git clone https://github.com/exo-explore/exo.git
Cloning into 'exo'...
remote: Enumerating objects: 9736, done.
remote: Counting objects: 100% (411/411), done.
remote: Compressing objects: 100% (148/148), done.
remote: Total 9736 (delta 333), reused 263 (delta 263), pack-reused 9325 (from 3)
Receiving objects: 100% (9736/9736), 12.18 MiB | 8.41 MiB/s, done.
Resolving deltas: 100% (5917/5917), done.
Updating files: 100% (178/178), done.
Filtering content: 100% (9/9), 3.16 MiB | 2.45 MiB/s, done.
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % 

And, the content of the “Exo” folder should look like this –

total 28672
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 docs
-rwx------  1 satyaki_de  staff     1337 Mar  9 17:06 configure_mlx.sh
-rwx------  1 satyaki_de  staff    11107 Mar  9 17:06 README.md
-rwx------  1 satyaki_de  staff    35150 Mar  9 17:06 LICENSE
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 examples
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 exo
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 extra
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 scripts
-rwx------  1 satyaki_de  staff      390 Mar  9 17:06 install.sh
-rwx------  1 satyaki_de  staff      792 Mar  9 17:06 format.py
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 test
-rwx------  1 satyaki_de  staff     2476 Mar  9 17:06 setup.py
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:10 build
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:17 exo.egg-info

Similar commands need to fire to other devices. Here, I’m showing one Mac-Mini examples –

(base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % 
(base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % git clone https://github.com/exo-explore/exo.git
Cloning into 'exo'...
remote: Enumerating objects: 9736, done.
remote: Counting objects: 100% (424/424), done.
remote: Compressing objects: 100% (146/146), done.
remote: Total 9736 (delta 345), reused 278 (delta 278), pack-reused 9312 (from 4)
Receiving objects: 100% (9736/9736), 12.18 MiB | 6.37 MiB/s, done.
Resolving deltas: 100% (5920/5920), done.
(base) satyakidemini2@Satyakis-Mac-mini-2 Pandas % 

After that, I’ll execute the following sets of commands to install the framework –

(base) satyaki_de@Satyakis-MacBook-Pro-Max Pandas % cd exo
(base) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
(base) satyaki_de@Satyakis-MacBook-Pro-Max exo % conda create --name exo1 python=3.13
WARNING: A conda environment already exists at '/opt/anaconda3/envs/exo1'

Remove existing environment?
This will remove ALL directories contained within this specified prefix directory, including any other conda environments.

 (y/[n])? y

Channels:
 - defaults
Platform: osx-arm64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /opt/anaconda3/envs/exo1

  added / updated specs:
    - python=3.13


The following NEW packages will be INSTALLED:

  bzip2              pkgs/main/osx-arm64::bzip2-1.0.8-h80987f9_6 
  ca-certificates    pkgs/main/osx-arm64::ca-certificates-2025.2.25-hca03da5_0 
  expat              pkgs/main/osx-arm64::expat-2.6.4-h313beb8_0 
  libcxx             pkgs/main/osx-arm64::libcxx-14.0.6-h848a8c0_0 
  libffi             pkgs/main/osx-arm64::libffi-3.4.4-hca03da5_1 
  libmpdec           pkgs/main/osx-arm64::libmpdec-4.0.0-h80987f9_0 
  ncurses            pkgs/main/osx-arm64::ncurses-6.4-h313beb8_0 
  openssl            pkgs/main/osx-arm64::openssl-3.0.16-h02f6b3c_0 
  pip                pkgs/main/osx-arm64::pip-25.0-py313hca03da5_0 
  python             pkgs/main/osx-arm64::python-3.13.2-h4862095_100_cp313 
  python_abi         pkgs/main/osx-arm64::python_abi-3.13-0_cp313 
  readline           pkgs/main/osx-arm64::readline-8.2-h1a28f6b_0 
  setuptools         pkgs/main/osx-arm64::setuptools-75.8.0-py313hca03da5_0 
  sqlite             pkgs/main/osx-arm64::sqlite-3.45.3-h80987f9_0 
  tk                 pkgs/main/osx-arm64::tk-8.6.14-h6ba3021_0 
  tzdata             pkgs/main/noarch::tzdata-2025a-h04d1e81_0 
  wheel              pkgs/main/osx-arm64::wheel-0.45.1-py313hca03da5_0 
  xz                 pkgs/main/osx-arm64::xz-5.6.4-h80987f9_1 
  zlib               pkgs/main/osx-arm64::zlib-1.2.13-h18a0788_1 


Proceed ([y]/n)? y


Downloading and Extracting Packages:

Preparing transaction: done
Verifying transaction: done
Executing transaction: done
#
# To activate this environment, use
#
#     $ conda activate exo1
#
# To deactivate an active environment, use
#
#     $ conda deactivate

(base) satyaki_de@Satyakis-MacBook-Pro-Max exo % conda activate exo1
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % ls -lrt
total 24576
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 docs
-rwx------  1 satyaki_de  staff     1337 Mar  9 17:06 configure_mlx.sh
-rwx------  1 satyaki_de  staff    11107 Mar  9 17:06 README.md
-rwx------  1 satyaki_de  staff    35150 Mar  9 17:06 LICENSE
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 examples
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 exo
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 extra
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 scripts
-rwx------  1 satyaki_de  staff      390 Mar  9 17:06 install.sh
-rwx------  1 satyaki_de  staff      792 Mar  9 17:06 format.py
drwx------  1 satyaki_de  staff  1048576 Mar  9 17:06 test
-rwx------  1 satyaki_de  staff     2476 Mar  9 17:06 setup.py
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % pip install .
Processing /Volumes/WD_BLACK/PythonCourse/Pandas/exo
  Preparing metadata (setup.py) ... done
Collecting tinygrad@ git+https://github.com/tinygrad/tinygrad.git@ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8 (from exo==0.0.1)
  Cloning https://github.com/tinygrad/tinygrad.git (to revision ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8) to /private/var/folders/26/dj11b57559b8r8rl6ztdpc840000gn/T/pip-install-q18fzk3r/tinygrad_7917114c483a4d9c83c795b69dbeb5c7
  Running command git clone --filter=blob:none --quiet https://github.com/tinygrad/tinygrad.git /private/var/folders/26/dj11b57559b8r8rl6ztdpc840000gn/T/pip-install-q18fzk3r/tinygrad_7917114c483a4d9c83c795b69dbeb5c7
  Running command git rev-parse -q --verify 'sha^ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8'
  Running command git fetch -q https://github.com/tinygrad/tinygrad.git ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
  Running command git checkout -q ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
  Resolved https://github.com/tinygrad/tinygrad.git to commit ec120ce6b9ce8e4ff4b5692566a683ef240e8bc8
  Preparing metadata (setup.py) ... done
Collecting aiohttp==3.10.11 (from exo==0.0.1)
.
.
(Installed many more dependant packages)
.
.
Downloading propcache-0.3.0-cp313-cp313-macosx_11_0_arm64.whl (44 kB)
Building wheels for collected packages: exo, nuitka, numpy, uuid, tinygrad
  Building wheel for exo (setup.py) ... done
  Created wheel for exo: filename=exo-0.0.1-py3-none-any.whl size=901357 sha256=5665297f8ea09d06670c9dea91e40270acc4a3cf99a560bf8d268abb236050f7
  Stored in directory: /private/var/folders/26/dj118r8rl6ztdpc840000gn/T/pip-ephem-wheel-cache-0k8zloo3/wheels/b6/91/fb/c1c7d8ca90cf16b9cd8203c11bb512614bee7f6d34
  Building wheel for nuitka (pyproject.toml) ... done
  Created wheel for nuitka: filename=nuitka-2.5.1-cp313-cp313-macosx_11_0_arm64.whl size=3432720 sha256=ae5a280a1684fde98c334516ee8a99f9f0acb6fc2f625643b7f9c5c0887c2998
  Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/f6/c9/53/9e37c6fb34c27e892e8357aaead46da610f82117ab2825
  Building wheel for numpy (pyproject.toml) ... done
  Created wheel for numpy: filename=numpy-2.0.0-cp313-cp313-macosx_15_0_arm64.whl size=4920701 sha256=f030b0aa51ec6628f708fab0af14ff765a46d210df89aa66dd8d9482e59b5
  Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/e0/d3/66/30d07c18e56ac85e8d3ceaf22f093a09bae124a472b85d1
  Building wheel for uuid (setup.py) ... done
  Created wheel for uuid: filename=uuid-1.30-py3-none-any.whl size=6504 sha256=885103a90d1dc92d9a75707fc353f4154597d232f2599a636de1bc6d1c83d
  Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/cc/9d/72/13ff6a181eacfdbd6d761a4ee7c5c9f92034a9dc8a1b3c
  Building wheel for tinygrad (setup.py) ... done
  Created wheel for tinygrad: filename=tinygrad-0.10.0-py3-none-any.whl size=1333964 sha256=1f08c5ce55aa3c87668675beb80810d609955a81b99d416459d2489b36a
  Stored in directory: /Users/satyaki_de/Library/Caches/pip/wheels/c7/bd/02/bd91c1303002619dad23f70f4c1f1c15d0c24c60b043e
Successfully built exo nuitka numpy uuid tinygrad
Installing collected packages: uuid, sentencepiece, nvidia-ml-py, zstandard, uvloop, urllib3, typing-extensions, tqdm, tinygrad, scapy, safetensors, regex, pyyaml, pygments, psutil, protobuf, propcache, prometheus-client, pillow, packaging, ordered-set, numpy, multidict, mlx, mdurl, MarkupSafe, idna, grpcio, fsspec, frozenlist, filelock, charset-normalizer, certifi, attrs, annotated-types, aiohappyeyeballs, aiofiles, yarl, requests, pydantic-core, opencv-python, nuitka, markdown-it-py, Jinja2, grpcio-tools, aiosignal, rich, pydantic, huggingface-hub, aiohttp, tokenizers, aiohttp_cors, transformers, mlx-lm, exo
Successfully installed Jinja2-3.1.4 MarkupSafe-3.0.2 aiofiles-24.1.0 aiohappyeyeballs-2.5.0 aiohttp-3.10.11 aiohttp_cors-0.7.0 aiosignal-1.3.2 annotated-types-0.7.0 attrs-25.1.0 certifi-2025.1.31 charset-normalizer-3.4.1 exo-0.0.1 filelock-3.17.0 frozenlist-1.5.0 fsspec-2025.3.0 grpcio-1.67.0 grpcio-tools-1.67.0 huggingface-hub-0.29.2 idna-3.10 markdown-it-py-3.0.0 mdurl-0.1.2 mlx-0.22.0 mlx-lm-0.21.1 multidict-6.1.0 nuitka-2.5.1 numpy-2.0.0 nvidia-ml-py-12.560.30 opencv-python-4.10.0.84 ordered-set-4.1.0 packaging-24.2 pillow-10.4.0 prometheus-client-0.20.0 propcache-0.3.0 protobuf-5.28.1 psutil-6.0.0 pydantic-2.9.2 pydantic-core-2.23.4 pygments-2.19.1 pyyaml-6.0.2 regex-2024.11.6 requests-2.32.3 rich-13.7.1 safetensors-0.5.3 scapy-2.6.1 sentencepiece-0.2.0 tinygrad-0.10.0 tokenizers-0.20.3 tqdm-4.66.4 transformers-4.46.3 typing-extensions-4.12.2 urllib3-2.3.0 uuid-1.30 uvloop-0.21.0 yarl-1.18.3 zstandard-0.23.0
(exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % 

And, you need to perform the same process in other available devices as well.

Now, we’re ready to proceed with the final command –

(.venv) (exo1) satyaki_de@Satyakis-MacBook-Pro-Max exo % exo
/opt/anaconda3/envs/exo1/lib/python3.13/site-packages/google/protobuf/runtime_version.py:112: UserWarning: Protobuf gencode version 5.27.2 is older than the runtime version 5.28.1 at node_service.proto. Please avoid checked-in Protobuf gencode that can be obsolete.
  warnings.warn(
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.
Selected inference engine: None

  _____  _____  
 / _ \ \/ / _ \ 
|  __/>  < (_) |
 \___/_/\_\___/ 
    
Detected system: Apple Silicon Mac
Inference engine name after selection: mlx
Using inference engine: MLXDynamicShardInferenceEngine with shard downloader: SingletonShardDownloader
[60771, 54631, 54661]
Chat interface started:
 - http://127.0.0.1:52415
 - http://XXX.XXX.XX.XX:52415
 - http://XXX.XXX.XXX.XX:52415
 - http://XXX.XXX.XXX.XXX:52415
ChatGPT API endpoint served at:
 - http://127.0.0.1:52415/v1/chat/completions
 - http://XXX.XXX.X.XX:52415/v1/chat/completions
 - http://XXX.XXX.XXX.XX:52415/v1/chat/completions
 - http://XXX.XXX.XXX.XXX:52415/v1/chat/completions
has_read=True, has_write=True
╭────────────────────────────────────────────────────────────────────────────────────────────── Exo Cluster (2 nodes) ───────────────────────────────────────────────────────────────────────────────────────────────╮
Received exit signal SIGTERM...
Thank you for using exo.

  _____  _____  
 / _ \ \/ / _ \ 
|  __/>  < (_) |
 \___/_/\_\___/ 
    

Note that I’ve masked the IP addresses for security reasons.


At the beginning, if we trigger the main MacBook Pro Max, the “Exo” screen should looks like this –

And if you open the URL, you will see the following ChatGPT-like interface –

Connecting without the Thunderbolt bridge with the relevant port or a hub may cause performance degradation. Hence, how you connect will play a major role in the success of this intention. However, this is certainly a great idea to proceed with.


So, we’ve done it.

We’ll cover the detailed performance testing, Optimized configurations & many other useful details in our next post.

Till then, Happy Avenging! 🙂

Monitoring & evaluating the leading LLMs (both the established & new) by Python-based evaluator

As we’re leaping more & more into the field of Generative AI, one of the frequent questions or challenges people are getting more & more is the performance & other evaluation factors. These factors will eventually bring the fruit of this technology; otherwise, you will end up in technical debt.

This post will discuss the key snippets of the monitoring app based on the Python-based AI app. But before that, let us first view the demo.

Isn’t it exciting?


Let us deep dive into it. But, here is the flow this solution will follow.

So, the current application will invoke the industry bigshots and some relatively unknown or new LLMs.

In this case, we’ll evaluate Anthropic, Open AI, DeepSeek, and Bharat GPT’s various models. However, Bharat GPT is open source, so we’ll use the Huggingface library and execute it locally against my MacBook Pro M4 Max.

The following are the KPIs we’re going to evaluate:

Here are the lists of dependant python packages that is require to run this application –

pip install certifi==2024.8.30
pip install anthropic==0.42.0
pip install huggingface-hub==0.27.0
pip install nltk==3.9.1
pip install numpy==2.2.1
pip install moviepy==2.1.1
pip install numpy==2.1.3
pip install openai==1.59.3
pip install pandas==2.2.3
pip install pillow==11.1.0
pip install pip==24.3.1
pip install psutil==6.1.1
pip install requests==2.32.3
pip install rouge_score==0.1.2
pip install scikit-learn==1.6.0
pip install setuptools==70.2.0
pip install tokenizers==0.21.0
pip install torch==2.6.0.dev20250104
pip install torchaudio==2.6.0.dev20250104
pip install torchvision==0.22.0.dev20250104
pip install tqdm==4.67.1
pip install transformers==4.47.1
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def get_claude_response(self, prompt: str) -> str:
        response = self.anthropic_client.messages.create(
            model=anthropic_model,
            max_tokens=maxToken,
            messages=[{"role": "user", "content": prompt}]
        )
        return response.content[0].text
  1. The Retry Mechanism
    • The @retry line means this function will automatically try again if it fails.
    • It will stop retrying after 3 attempts (stop_after_attempt(3)).
    • It will wait longer between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
  2. The Function Purpose
    • The function takes a message, called prompt, as input (a string of text).
    • It uses a service (likely an AI system like Claude) to generate a response to this prompt.
  3. Sending the Message
    • Inside the function, the code self.anthropic_client.messages.create is the part that actually sends the prompt to the AI.
    • It specifies:Which AI model to use (e.g., anthropic_model).
    • The maximum length of the response (controlled by maxToken).
    • The input message for the AI has a “role” (user), as well as the content of the prompt.
  4. Getting the Response
    • Once the AI generates a response, it’s saved as response.
    • The code retrieves the first part of the response (response.content[0].text) and sends it back to whoever called the function.

Similarly, it will work for Open AI as well.

    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def get_deepseek_response(self, prompt: str) -> tuple:
        deepseek_api_key = self.deepseek_api_key

        headers = {
            "Authorization": f"Bearer {deepseek_api_key}",
            "Content-Type": "application/json"
            }
        
        payload = {
            "model": deepseek_model,  
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": maxToken
            }
        
        response = requests.post(DEEPSEEK_API_URL, headers=headers, json=payload)

        if response.status_code == 200:
            res = response.json()["choices"][0]["message"]["content"]
        else:
            res = "API request failed with status code " + str(response.status_code) + ":" + str(response.text)

        return res
  1. Retry Mechanism:
    • The @retry line ensures the function will try again if it fails.
    • It will stop retrying after 3 attempts (stop_after_attempt(3)).
    • It waits between retries, starting at 4 seconds and increasing up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).

  1. What the Function Does:
    • The function takes one input, prompt, which is the message or question you want to send to the AI.
    • It returns the AI’s response or an error message.

  1. Preparing to Communicate with the API:
    • API Key: It gets the API key for the DeepSeek service from self.deepseek_api_key.
    • Headers: These tell the API that the request will use the API key (for security) and that the data format is JSON (structured text).
    • Payload: This is the information sent to the AI. It includes:
      • Model: Specifies which version of the AI to use (deepseek_model).
      • Messages: The input message with the role “user” and your prompt.
      • Max Tokens: Defines the maximum size of the AI’s response (maxToken).

  1. Sending the Request:
    • It uses the requests.post() method to send the payload and headers to the DeepSeek API using the URL DEEPSEEK_API_URL.

  1. Processing the Response:
    • If the API responds successfully (status_code == 200):
      • It extracts the AI’s reply from the response data.
      • Specifically, it gets the first choice’s message content: response.json()["choices"][0]["message"]["content"].
    • If there’s an error:
      • It constructs an error message with the status code and detailed error text from the API.

  1. Returning the Result:
    • The function outputs either the AI’s response or the error message.
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    def get_bharatgpt_response(self, prompt: str) -> tuple:
        try:
            messages = [[{"role": "user", "content": prompt}]]
            
            response = pipe(messages, max_new_tokens=maxToken,)

            # Extract 'content' field safely
            res = next((entry.get("content", "")
                        for entry in response[0][0].get("generated_text", [])
                        if isinstance(entry, dict) and entry.get("role") == "assistant"
                        ),
                        None,
                        )
            
            return res
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ""
  1. Retry Mechanism:The @retry ensures the function will try again if it fails.
    • It will stop retrying after 3 attempts (stop_after_attempt(3)).
    • The waiting time between retries starts at 4 seconds and increases exponentially up to 10 seconds (wait_exponential(multiplier=1, min=4, max=10)).
  2. What the Function Does:The function takes one input, prompt, which is the message or question you want to send to BharatGPT.
    • It returns the AI’s response or an empty string if something goes wrong.
  3. Sending the Prompt:Messages Structure: The function wraps the user’s prompt in a format that the BharatGPT AI understands:
    • messages = [[{"role": "user", "content": prompt}]]
    • This tells the AI that the prompt is coming from the “user.”
  4. Pipe Function: It uses a pipe() method to send the messages to the AI system.
    • max_new_tokens=maxToken: Limits how long the AI’s response can be.
  5. Extracting the Response:The response from the AI is in a structured format. The code looks for the first piece of text where:
    • The role is “assistant” (meaning it’s the AI’s reply).
    • The text is in the “content” field.
    • The next() function safely extracts this “content” field or returns None if it can’t find it.
  6. Error Handling:If something goes wrong (e.g., the AI doesn’t respond or there’s a technical issue), the code:
    • Captures the error message in e.
    • Prints the error message: print('Error: ', x).
    • Returns an empty string ("") instead of crashing.
  7. Returning the Result:If everything works, the function gives you the AI’s response as plain text.
    • If there’s an error, it gives you an empty string, indicating no response was received.

    def get_model_response(self, model_name: str, prompt: str) -> ModelResponse:
        """Get response from specified model with metrics"""
        start_time = time.time()
        start_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

        try:
            if model_name == "claude-3":
                response_content = self.get_claude_response(prompt)
            elif model_name == "gpt4":
                response_content = self.get_gpt4_response(prompt)
            elif model_name == "deepseek-chat":
                response_content = self.get_deepseek_response(prompt)
            elif model_name == "bharat-gpt":
                response_content = self.get_bharatgpt_response(prompt)

            # Model-specific API calls 
            token_count = len(self.bert_tokenizer.encode(response_content))
            
            end_memory = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
            memory_usage = end_memory - start_memory
            
            return ModelResponse(
                content=response_content,
                response_time=time.time() - start_time,
                token_count=token_count,
                memory_usage=memory_usage
            )
        except Exception as e:
            logging.error(f"Error getting response from {model_name}: {str(e)}")
            return ModelResponse(
                content="",
                response_time=0,
                token_count=0,
                memory_usage=0,
                error=str(e)
            )

Start Tracking Time and Memory:

    • The function starts a timer (start_time) to measure how long it takes to get a response.
    • It also checks how much memory is being used at the beginning (start_memory).

    Choose the AI Model:

    • Based on the model_name provided, the function selects the appropriate method to get a response:
      • "claude-3" → Calls get_claude_response(prompt).
      • "gpt4" → Calls get_gpt4_response(prompt).
      • "deepseek-chat" → Calls get_deepseek_response(prompt).
      • "bharat-gpt" → Calls get_bharatgpt_response(prompt).

    Process the Response:

    • Once the response is received, the function calculates:
      • Token Count: The number of tokens (small chunks of text) in the response using a tokenizer.
      • Memory Usage: The difference between memory usage after the response (end_memory) and before it (start_memory).

    Return the Results:

    • The function bundles all the information into a ModelResponse object:
      • The AI’s reply (content).
      • How long the response took (response_time).
      • The number of tokens in the reply (token_count).
      • How much memory was used (memory_usage).

    Handle Errors:

    • If something goes wrong (e.g., the AI doesn’t respond), the function:
      • Logs the error message.
      • Returns an empty response with default values and the error message.
        def evaluate_text_quality(self, generated: str, reference: str) -> Dict[str, float]:
            """Evaluate text quality metrics"""
            # BERTScore
            gen_embedding = self.sentence_model.encode([generated])
            ref_embedding = self.sentence_model.encode([reference])
            bert_score = cosine_similarity(gen_embedding, ref_embedding)[0][0]
    
            # BLEU Score
            generated_tokens = word_tokenize(generated.lower())
            reference_tokens = word_tokenize(reference.lower())
            bleu = sentence_bleu([reference_tokens], generated_tokens)
    
            # METEOR Score
            meteor = meteor_score([reference_tokens], generated_tokens)
    
            return {
                'bert_score': bert_score,
                'bleu_score': bleu,
                'meteor_score': meteor
            }

    Inputs:

    • generated: The text produced by the AI.
    • reference: The correct or expected version of the text.

    Calculating BERTScore:

    • Converts the generated and reference texts into numerical embeddings (mathematical representations) using a pre-trained model (self.sentence_model.encode).
    • Measures the similarity between the two embeddings using cosine similarity. This gives the bert_score, which ranges from -1 (completely different) to 1 (very similar).

    Calculating BLEU Score:

    • Breaks the generated and reference texts into individual words (tokens) using word_tokenize.
    • Converts both texts to lowercase for consistent comparison.
    • Calculates the BLEU Score (sentence_bleu), which checks how many words or phrases in the generated text overlap with the reference. BLEU values range from 0 (no match) to 1 (perfect match).

    Calculating METEOR Score:

    • Also uses the tokenized versions of generated and reference texts.
    • Calculates the METEOR Score (meteor_score), which considers exact matches, synonyms, and word order. Scores range from 0 (no match) to 1 (perfect match).

    Returning the Results:

    • Combines the three scores into a dictionary with the keys 'bert_score''bleu_score', and 'meteor_score'.

    Similarly, other functions are developed.

        def run_comprehensive_evaluation(self, evaluation_data: List[Dict]) -> pd.DataFrame:
            """Run comprehensive evaluation on all metrics"""
            results = []
            
            for item in evaluation_data:
                prompt = item['prompt']
                reference = item['reference']
                task_criteria = item.get('task_criteria', {})
                
                for model_name in self.model_configs.keys():
                    # Get multiple responses to evaluate reliability
                    responses = [
                        self.get_model_response(model_name, prompt)
                        for _ in range(3)  # Get 3 responses for reliability testing
                    ]
                    
                    # Use the best response for other evaluations
                    best_response = max(responses, key=lambda x: len(x.content) if not x.error else 0)
                    
                    if best_response.error:
                        logging.error(f"Error in model {model_name}: {best_response.error}")
                        continue
                    
                    # Gather all metrics
                    metrics = {
                        'model': model_name,
                        'prompt': prompt,
                        'response': best_response.content,
                        **self.evaluate_text_quality(best_response.content, reference),
                        **self.evaluate_factual_accuracy(best_response.content, reference),
                        **self.evaluate_task_performance(best_response.content, task_criteria),
                        **self.evaluate_technical_performance(best_response),
                        **self.evaluate_reliability(responses),
                        **self.evaluate_safety(best_response.content)
                    }
                    
                    # Add business impact metrics using task performance
                    metrics.update(self.evaluate_business_impact(
                        best_response,
                        metrics['task_completion']
                    ))
                    
                    results.append(metrics)
            
            return pd.DataFrame(results)
    • Input:
      • evaluation_data: A list of test cases, where each case is a dictionary containing:
        • prompt: The question or input to the AI model.
        • reference: The ideal or expected answer.
        • task_criteria (optional): Additional rules or requirements for the task.
    • Initialize Results:
      • An empty list results is created to store the evaluation metrics for each model and test case.
    • Iterate Through Test Cases:
      • For each item in the evaluation_data:
        • Extract the promptreference, and task_criteria.
    • Evaluate Each Model:
      • Loop through all available AI models (self.model_configs.keys()).
      • Generate three responses for each model to test reliability.
    • Select the Best Response:
      • Out of the three responses, pick the one with the most content (best_response), ignoring responses with errors.
    • Handle Errors:
      • If a response has an error, log the issue and skip further evaluation for that model.
    • Evaluate Metrics:
      • Using the best_response, calculate a variety of metrics, including:
        • Text Quality: How similar the response is to the reference.
        • Factual Accuracy: Whether the response is factually correct.
        • Task Performance: How well it meets task-specific criteria.
        • Technical Performance: Evaluate time, memory, or other system-related metrics.
        • Reliability: Check consistency across multiple responses.
        • Safety: Ensure the response is safe and appropriate.
    • Evaluate Business Impact:
      • Add metrics for business impact (e.g., how well the task was completed, using task_completion as a key factor).
    • Store Results:
      • Add the calculated metrics for this model and prompt to the results list.
    • Return Results as a DataFrame:
      • Convert the results list into a structured table (a pandas DataFrame) for easy analysis and visualization.

    Great! So, now, we’ve explained the code.

    Let us understand the final outcome of this run & what we can conclude from that.

    1. BERT Score (Semantic Understanding):
      • GPT4 leads slightly at 0.8322 (83.22%)
      • Bharat-GPT close second at 0.8118 (81.18%)
      • Claude-3 at 0.8019 (80.19%)
      • DeepSeek-Chat at 0.7819 (78.19%) Think of this like a “comprehension score” – how well the models understand the context. All models show strong understanding, with only a 5% difference between best and worst.
    2. BLEU Score (Word-for-Word Accuracy):
      • Bharat-GPT leads at 0.0567 (5.67%)
      • Claude-3 at 0.0344 (3.44%)
      • GPT4 at 0.0306 (3.06%)
      • DeepSeek-Chat lowest at 0.0189 (1.89%) These low scores suggest models use different wording than references, which isn’t necessarily bad.
    3. METEOR Score (Meaning Preservation):
      • Bharat-GPT leads at 0.4684 (46.84%)
      • Claude-3 close second at 0.4507 (45.07%)
      • GPT4 at 0.2960 (29.60%)
      • DeepSeek-Chat at 0.2652 (26.52%) This shows how well models maintain meaning while using different words.
    4. Response Time (Speed):
      • Claude-3 fastest: 4.40 seconds
      • Bharat-GPT: 6.35 seconds
      • GPT4: 6.43 seconds
      • DeepSeek-Chat slowest: 8.52 seconds
    5. Safety and Reliability:
      • Error Rate: Perfect 0.0 for all models
      • Toxicity: All very safe (below 0.15%) 
        • Claude-3 safest at 0.0007GPT4 at 0.0008Bharat-GPT at 0.0012
        • DeepSeek-Chat at 0.0014
    6. Cost Efficiency:
      • Claude-3 most economical: $0.0019 per response
      • Bharat-GPT close: $0.0021
      • GPT4: $0.0038
      • DeepSeek-Chat highest: $0.0050

    Key Takeaways by Model:

    1. Claude-3: ✓ Fastest responses ✓ Most cost-effective ✓ Excellent meaning preservation ✓ Lowest toxicity
    2. Bharat-GPT: ✓ Best BLEU and METEOR scores ✓ Strong semantic understanding ✓ Cost-effective ✗ Moderate response time
    3. GPT4: ✓ Best semantic understanding ✓ Good safety metrics ✗ Higher cost ✗ Moderate response time
    4. DeepSeek-Chat: ✗ Generally lower performance ✗ Slowest responses ✗ Highest cost ✗ Slightly higher toxicity

    Reliability of These Statistics:

    Strong Points:

    • Comprehensive metric coverage
    • Consistent patterns across evaluations
    • Zero error rates show reliability
    • Clear differentiation between models

    Limitations:

    • BLEU scores are quite low across all models
    • Doesn’t measure creative or innovative responses
    • May not reflect specific use case performance
    • Single snapshot rather than long-term performance

    Final Observation:

    1. Best Overall Value: Claude-3
      • Fast, cost-effective, safe, good performance
    2. Best for Accuracy: Bharat-GPT
      • Highest meaning preservation and precision
    3. Best for Understanding: GPT4
      • Strongest semantic comprehension
    4. Consider Your Priorities: 
      • Speed → Choose Claude-3
      • Cost → Choose Claude-3 or Bharat-GPT
      • Accuracy → Choose Bharat-GPT
      • Understanding → Choose GPT4

    These statistics provide reliable comparative data but should be part of a broader decision-making process that includes your specific needs, budget, and use cases.


    For the Bharat GPT model, we’ve tested this locally on my MacBook Pro 4 Max. And, the configuration is as follows –

    I’ve tried the API version locally, & it provided a similar performance against the stats that we received by running locally. Unfortunately, they haven’t made the API version public yet.

    So, apart from the Anthropic & Open AI, I’ll watch this new LLM (Bharat GPT) for overall stats in the coming days.


    So, we’ve done it.

    You can find the detailed code at the GitHub link.

    I’ll bring some more exciting topics in the coming days from the Python verse.

    Till then, Happy Avenging! 🙂

    Enabling & Exploring Stable Defussion – Part 3

    Before we dive into the details of this post, let us provide the previous two links that precede it.

    Enabling & Exploring Stable Defussion – Part 1

    Enabling & Exploring Stable Defussion – Part 2

    For, reference, we’ll share the demo before deep dive into the actual follow-up analysis in the below section –


    Now, let us continue our discussions from where we left.

    class clsText2Image:
        def __init__(self, pipe, output_path, filename):
    
            self.pipe = pipe
            
            # More aggressive attention slicing
            self.pipe.enable_attention_slicing(slice_size=1)
    
            self.output_path = f"{output_path}{filename}"
            
            # Warm up the pipeline
            self._warmup()
        
        def _warmup(self):
            """Warm up the pipeline to optimize memory allocation"""
            with torch.no_grad():
                _ = self.pipe("warmup", num_inference_steps=1, height=512, width=512)
            torch.mps.empty_cache()
            gc.collect()
        
        def generate(self, prompt, num_inference_steps=12, guidance_scale=3.0):
            try:
                torch.mps.empty_cache()
                gc.collect()
                
                with torch.autocast(device_type="mps"):
                    with torch.no_grad():
                        image = self.pipe(
                            prompt,
                            num_inference_steps=num_inference_steps,
                            guidance_scale=guidance_scale,
                            height=1024,
                            width=1024,
                        ).images[0]
                
                image.save(self.output_path)
                return 0
            except Exception as e:
                print(f'Error: {str(e)}')
                return 1
            finally:
                torch.mps.empty_cache()
                gc.collect()
    
        def genImage(self, prompt):
            try:
    
                # Initialize generator
                x = self.generate(prompt)
    
                if x == 0:
                    print('Successfully processed first pass!')
                else:
                    print('Failed complete first pass!')
                    raise 
    
                return 0
    
            except Exception as e:
                print(f"\nAn unexpected error occurred: {str(e)}")
    
                return 1

    This is the initialization method for the clsText2Image class:

    • Takes a pre-configured pipe (text-to-image pipeline), an output_path, and a filename.
    • Enables more aggressive memory optimization by setting “attention slicing.”
    • Prepares the full file path for saving generated images.
    • Calls a _warmup method to pre-load the pipeline and optimize memory allocation.

    This private method warms up the pipeline:

    • Sends a dummy “warmup” request with basic parameters to allocate memory efficiently.
    • Clears any cached memory (torch.mps.empty_cache()) and performs garbage collection (gc.collect()).
    • Ensures smoother operation for future image generation tasks.

    This method generates an image from a text prompt:

    • Clears memory cache and performs garbage collection before starting.
    • Uses the text-to-image pipeline (pipe) to generate an image:
      • Takes the prompt, number of inference steps, and guidance scale as input.
      • Outputs an image at 1024×1024 resolution.
    • Saves the generated image to the specified output path.
    • Returns 0 on success or 1 on failure.
    • Ensures cleanup by clearing memory and collecting garbage, even in case of errors.

    This method simplifies image generation:

    • Calls the generate method with the given prompt.
    • Prints a success message if the image is generated (0 return value).
    • On failure, logs the error and raises an exception.
    • Returns 0 on success or 1 on failure.
    class clsImage2Video:
        def __init__(self, pipeline):
            
            # Optimize model loading
            torch.mps.empty_cache()
            self.pipeline = pipeline
    
        def generate_frames(self, pipeline, init_image, prompt, duration_seconds=10):
            try:
                torch.mps.empty_cache()
                gc.collect()
    
                base_frames = []
                img = Image.open(init_image).convert("RGB").resize((1024, 1024))
                
                for _ in range(10):
                    result = pipeline(
                        prompt=prompt,
                        image=img,
                        strength=0.45,
                        guidance_scale=7.5,
                        num_inference_steps=25
                    ).images[0]
    
                    base_frames.append(np.array(result))
                    img = result
                    torch.mps.empty_cache()
    
                frames = []
                for i in range(len(base_frames)-1):
                    frame1, frame2 = base_frames[i], base_frames[i+1]
                    for t in np.linspace(0, 1, int(duration_seconds*24/10)):
                        frame = (1-t)*frame1 + t*frame2
                        frames.append(frame.astype(np.uint8))
                
                return frames
            except Exception as e:
                frames = []
                print(f'Error: {str(e)}')
    
                return frames
            finally:
                torch.mps.empty_cache()
                gc.collect()
    
        # Main method
        def genVideo(self, prompt, inputImage, targetVideo, fps):
            try:
                print("Starting animation generation...")
                
                init_image_path = inputImage
                output_path = targetVideo
                fps = fps
                
                frames = self.generate_frames(
                    pipeline=self.pipeline,
                    init_image=init_image_path,
                    prompt=prompt,
                    duration_seconds=20
                )
                
                imageio.mimsave(output_path, frames, fps=30)
    
                print("Animation completed successfully!")
    
                return 0
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return 1

    This initializes the clsImage2Video class:

    • Clears the GPU cache to optimize memory before loading.
    • Sets up the pipeline for generating frames, which uses an image-to-video transformation model.

    This function generates frames for a video:

    • Starts by clearing GPU memory and running garbage collection.
    • Loads the init_image, resizes it to 1024×1024 pixels, and converts it to RGB format.
    • Iteratively applies the pipeline to transform the image:
      • Uses the prompt and specified parameters like strengthguidance_scale, and num_inference_steps.
      • Stores the resulting frames in a list.
    • Interpolates between consecutive frames to create smooth transitions:
      • Uses linear blending for smooth animation across a specified duration and frame rate (24 fps for 10 segments).
    • Returns the final list of generated frames or an empty list if an error occurs.
    • Always clears memory after execution.

    This is the main function for creating a video from an image and text prompt:

    • Logs the start of the animation generation process.
    • Calls generate_frames() with the given pipelineinputImage, and prompt to create frames.
    • Saves the generated frames as a video using the imageio library, setting the specified frame rate (fps).
    • Logs a success message and returns 0 if the process is successful.
    • On error, logs the issue and returns 1.

    Now, let us understand the performance. But, before that let us explore the device on which we’ve performed these stress test that involves GPU & CPUs as well.

    And, here is the performance stats –

    From the above snapshot, we can clearly communicate that the GPU is 100% utilized. However, the CPU has shown a significant % of availability.

    As you can see, the first pass converts the input prompt to intermediate images within 1 min 30 sec. However, the second pass constitutes multiple hops (11 hops) on an avg 22 seconds. Overall, the application will finish in 5 minutes 36 seconds for a 10-second video clip.


    So, we’ve done it.

    You can find the detailed code at the GitHub link.

    I’ll bring some more exciting topics in the coming days from the Python verse.

    Till then, Happy Avenging! 🙂

    Enabling & Exploring Stable Defussion – Part 2

    As we’ve started explaining, the importance & usage of Stable Defussion in our previous post:

    Enabling & Exploring Stable Defussion – Part 1

    In today’s post, we’ll discuss another approach, where we built the custom Python-based SDK solution that consumes HuggingFace Library, which generates video out of the supplied prompt.

    But, before that, let us view the demo generated from a custom solution.

    Isn’t it exciting? Let us dive deep into the details.


    Let us understand basic flow of events for the custom solution –

    So, the application will interact with the python-sdk like “stable-diffusion-3.5-large” & “dreamshaper-xl-1-0”, which is available in HuggingFace. As part of the process, these libraries will load all the large models inside the local laptop that require some time depend upon the bandwidth of your internet.

    Before we even deep dive into the code, let us understand the flow of Python scripts as shown below:

    From the above diagram, we can understand that the main application will be triggered by “generateText2Video.py”. As you can see that “clsConfigClient.py” has all the necessary parameter information that will be supplied to all the scripts.

    “generateText2Video.py” will trigger the main class named “clsText2Video.py”, which then calls all the subsequent classes.

    Great! Since we now have better visibility of the script flow, let’s examine the key snippets individually.


    class clsText2Video:
        def __init__(self, model_id_1, model_id_2, output_path, filename, vidfilename, fps, force_cpu=False):
            self.model_id_1 = model_id_1
            self.model_id_2 = model_id_2
            self.output_path = output_path
            self.filename = filename
            self.vidfilename = vidfilename
            self.force_cpu = force_cpu
            self.fps = fps
    
            # Initialize in main process
            os.environ["TOKENIZERS_PARALLELISM"] = "true"
            self.r1 = cm.clsMaster(force_cpu)
            self.torch_type = self.r1.getTorchType()
            
            torch.mps.empty_cache()
            self.pipe = self.r1.getText2ImagePipe(self.model_id_1, self.torch_type)
            self.pipeline = self.r1.getImage2VideoPipe(self.model_id_2, self.torch_type)
    
            self.text2img = cti.clsText2Image(self.pipe, self.output_path, self.filename)
            self.img2vid = civ.clsImage2Video(self.pipeline)
    
        def getPrompt2Video(self, prompt):
            try:
                input_image = self.output_path + self.filename
                target_video = self.output_path + self.vidfilename
    
                if self.text2img.genImage(prompt) == 0:
                    print('Pass 1: Text to intermediate images generated!')
                    
                    if self.img2vid.genVideo(prompt, input_image, target_video, self.fps) == 0:
                        print('Pass 2: Successfully generated!')
                        return 0
                return 1
            except Exception as e:
                print(f"\nAn unexpected error occurred: {str(e)}")
                return 1

    Now, let us interpret:

    This is the initialization method for the class. It does the following:

    • Sets up configurations like model IDs, output paths, filenames, video filename, frames per second (fps), and whether to use the CPU (force_cpu).
    • Configures an environment variable for tokenizer parallelism.
    • Initializes helper classes (clsMaster) to manage system resources and retrieve appropriate PyTorch settings.
    • Creates two pipelines:
      • pipe: For converting text to images using the first model.
      • pipeline: For converting images to video using the second model.
    • Initializes text2img and img2vid objects:
      • text2img handles text-to-image conversions.
      • img2vid handles image-to-video conversions.

    This method generates a video from a text prompt in two steps:

    1. Text-to-Image Conversion:
      • Calls genImage(prompt) using the text2img object to create an intermediate image file.
      • If successful, it prints confirmation.
    2. Image-to-Video Conversion:
      • Uses the img2vid object to convert the intermediate image into a video file.
      • Includes the input image path, target video path, and frames per second (fps).
      • If successful, it prints confirmation.
    • If either step fails, the method returns 1.
    • Logs any unexpected errors and returns 1 in such cases.
    # Set device for Apple Silicon GPU
    def setup_gpu(force_cpu=False):
        if not force_cpu and torch.backends.mps.is_available() and torch.backends.mps.is_built():
            print('Running on Apple Silicon MPS GPU!')
            return torch.device("mps")
        return torch.device("cpu")
    
    ######################################
    ####         Global Flag      ########
    ######################################
    
    class clsMaster:
        def __init__(self, force_cpu=False):
            self.device = setup_gpu(force_cpu)
    
        def getTorchType(self):
            try:
                # Check if MPS (Apple Silicon GPU) is available
                if not torch.backends.mps.is_available():
                    torch_dtype = torch.float32
                    raise RuntimeError("MPS (Metal Performance Shaders) is not available on this system.")
                else:
                    torch_dtype = torch.float16
                
                return torch_dtype
            except Exception as e:
                torch_dtype = torch.float16
                print(f'Error: {str(e)}')
    
                return torch_dtype
    
        def getText2ImagePipe(self, model_id, torchType):
            try:
                device = self.device
    
                torch.mps.empty_cache()
                self.pipe = StableDiffusion3Pipeline.from_pretrained(model_id, torch_dtype=torchType, use_safetensors=True, variant="fp16",).to(device)
    
                return self.pipe
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                torch.mps.empty_cache()
                self.pipe = StableDiffusion3Pipeline.from_pretrained(model_id, torch_dtype=torchType,).to(device)
    
                return self.pipe
            
        def getImage2VideoPipe(self, model_id, torchType):
            try:
                device = self.device
    
                torch.mps.empty_cache()
                self.pipeline = StableDiffusionXLImg2ImgPipeline.from_pretrained(model_id, torch_dtype=torchType, use_safetensors=True, use_fast=True).to(device)
    
                return self.pipeline
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                torch.mps.empty_cache()
                self.pipeline = StableDiffusionXLImg2ImgPipeline.from_pretrained(model_id, torch_dtype=torchType).to(device)
    
                return self.pipeline

    Let us interpret:

    This function determines whether to use the Apple Silicon GPU (MPS) or the CPU:

    • If force_cpu is False and the MPS GPU is available, it sets the device to “mps” (Apple GPU) and prints a message.
    • Otherwise, it defaults to the CPU.

    This is the initializer for the clsMaster class:

    • It sets the device to either GPU or CPU using the setup_gpu function (mentioned above) based on the force_cpu flag.

    This method determines the PyTorch data type to use:

    • Checks if MPS GPU is available:
      • If available, uses torch.float16 for optimized performance.
      • If unavailable, defaults to torch.float32 and raises a warning.
    • Handles errors gracefully by defaulting to torch.float16 and printing the error.

    This method initializes a text-to-image pipeline:

    • Loads the Stable Diffusion model with the given model_id and torchType.
    • Configures it for MPS GPU or CPU, based on the device.
    • Clears the GPU cache before loading the model to optimize memory usage.
    • If an error occurs, attempts to reload the pipeline without safetensors.

    This method initializes an image-to-video pipeline:

    • Similar to getText2ImagePipe, it loads the Stable Diffusion XL Img2Img pipeline with the specified model_id and torchType.
    • Configures it for MPS GPU or CPU and clears the cache before loading.
    • On error, reloads the pipeline without additional optimization settings and prints the error.

    Let us continue this in the next post:

    Enabling & Exploring Stable Defussion – Part 3

    Till then, Happy Avenging! 🙂

    Building a real-time Gen AI Improvement Matrices (GAIIM) using Python, UpTrain, Open AI & React

    How does the RAG work better for various enterprise-level Gen AI use cases? What needs to be there to make the LLM model work more efficiently & able to check the response & validate their response, including the bias, hallucination & many more?

    This is my post (after a slight GAP), which will capture and discuss some of the burning issues that many AI architects are trying to explore. In this post, I’ve considered a newly formed AI start-up from India, which developed an open-source framework that can easily evaluate all the challenges that one is facing with their LLMs & easily integrate with your existing models for better understanding including its limitations. You will get plenty of insights about it.

    But, before we dig deep, why not see the demo first –

    Isn’t it exciting? Let’s deep dive into the flow of events.


    Let’s explore the broad-level architecture/flow –

    Let us understand the steps of the above architecture. First, our Python application needs to trigger and enable the API, which will interact with the Open AI and UpTrain AI to fetch all the LLM KPIs based on the input from the React app named “Evaluation.”

    Once the response is received from UpTrain AI, the Python application then organizes the results in a better readable manner without changing the core details coming out from their APIs & then shares that back with the react interface.

    Let’s examine the react app’s sample inputs to better understand the input that will be passed to the Python-based API solution, which is wrapper capability to call multiple APIs from the UpTrain & then accumulate them under one response by parsing the data & reorganizing the data with the help of Open AI & sharing that back.

    Highlighted in RED are some of the critical inputs you need to provide to get most of the KPIs. And, here are the sample text inputs for your reference –

    Q. Enter input question.
    A. What are the four largest moons of Jupiter?
    Q. Enter the context document.
    A. Jupiter, the largest planet in our solar system, boasts a fascinating array of moons. Among these, the four largest are collectively known as the Galilean moons, named after the renowned astronomer Galileo Galilei, who first observed them in 1610. These four moons, Io, Europa, Ganymede, and Callisto, hold significant scientific interest due to their unique characteristics and diverse geological features.
    Q. Enter LLM response.
    A. The four largest moons of Jupiter, known as the Galilean moons, are Io, Europa, Ganymede, and Marshmello.
    Q. Enter the persona response.
    A. strict and methodical teacher
    Q. Enter the guideline.
    A. Response shouldn’t contain any specific numbers
    Q. Enter the ground truth.
    A. The Jupiter is the largest & gaseous planet in the solar system.
    Q. Choose the evaluation method.
    A. llm

    Once you fill in the App should look like this –

    Once you fill in, the app should look like the below screenshot –


    Let us understand the sample packages that are required for this task.

    pip install Flask==3.0.3
    pip install Flask-Cors==4.0.0
    pip install numpy==1.26.4
    pip install openai==1.17.0
    pip install pandas==2.2.2
    pip install uptrain==0.6.13

    Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

    def askFeluda(context, question):
        try:
            # Combine the context and the question into a single prompt.
            prompt_text = f"{context}\n\n Question: {question}\n Answer:"
    
            # Retrieve conversation history from the session or database
            conversation_history = []
    
            # Add the new message to the conversation history
            conversation_history.append(prompt_text)
    
            # Call OpenAI API with the updated conversation
            response = client.with_options(max_retries=0).chat.completions.create(
                messages=[
                    {
                        "role": "user",
                        "content": prompt_text,
                    }
                ],
                model=cf.conf['MODEL_NAME'],
                max_tokens=150,  # You can adjust this based on how long you expect the response to be
                temperature=0.3,  # Adjust for creativity. Lower values make responses more focused and deterministic
                top_p=1,
                frequency_penalty=0,
                presence_penalty=0
            )
    
            # Extract the content from the first choice's message
            chat_response = response.choices[0].message.content
    
            # Print the generated response text
            return chat_response.strip()
        except Exception as e:
            return f"An error occurred: {str(e)}"

    This function will ask the supplied questions with contexts or it will supply the UpTrain results to summarize the JSON into more easily readable plain texts. For our test, we’ve used “gpt-3.5-turbo”.

    def evalContextRelevance(question, context, resFeluda, personaResponse):
        try:
            data = [{
                'question': question,
                'context': context,
                'response': resFeluda
            }]
    
            results = eval_llm.evaluate(
                data=data,
                checks=[Evals.CONTEXT_RELEVANCE, Evals.FACTUAL_ACCURACY, Evals.RESPONSE_COMPLETENESS, Evals.RESPONSE_RELEVANCE, CritiqueTone(llm_persona=personaResponse), Evals.CRITIQUE_LANGUAGE, Evals.VALID_RESPONSE, Evals.RESPONSE_CONCISENESS]
            )
    
            return results
        except Exception as e:
            x = str(e)
    
            return x

    The above methods initiate the model from UpTrain to get all the stats, which will be helpful for your LLM response. In this post, we’ve captured the following KPIs –

    - Context Relevance Explanation
    - Factual Accuracy Explanation
    - Guideline Adherence Explanation
    - Response Completeness Explanation
    - Response Fluency Explanation
    - Response Relevance Explanation
    - Response Tonality Explanation
    # Function to extract and print all the keys and their values
    def extractPrintedData(data):
        for entry in data:
            print("Parsed Data:")
            for key, value in entry.items():
    
    
                if key == 'score_context_relevance':
                    s_1_key_val = value
                elif key == 'explanation_context_relevance':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_1_val = cleaned_value
                elif key == 'score_factual_accuracy':
                    s_2_key_val = value
                elif key == 'explanation_factual_accuracy':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_2_val = cleaned_value
                elif key == 'score_response_completeness':
                    s_3_key_val = value
                elif key == 'explanation_response_completeness':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_3_val = cleaned_value
                elif key == 'score_response_relevance':
                    s_4_key_val = value
                elif key == 'explanation_response_relevance':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_4_val = cleaned_value
                elif key == 'score_critique_tone':
                    s_5_key_val = value
                elif key == 'explanation_critique_tone':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_5_val = cleaned_value
                elif key == 'score_fluency':
                    s_6_key_val = value
                elif key == 'explanation_fluency':
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_6_val = cleaned_value
                elif key == 'score_valid_response':
                    s_7_key_val = value
                elif key == 'score_response_conciseness':
                    s_8_key_val = value
                elif key == 'explanation_response_conciseness':
                    print('Raw Value: ', value)
                    cleaned_value = preprocessParseData(value)
                    print(f"{key}: {cleaned_value}\n")
                    s_8_val = cleaned_value
    
        print('$'*200)
    
        results = {
            "Factual_Accuracy_Score": s_2_key_val,
            "Factual_Accuracy_Explanation": s_2_val,
            "Context_Relevance_Score": s_1_key_val,
            "Context_Relevance_Explanation": s_1_val,
            "Response_Completeness_Score": s_3_key_val,
            "Response_Completeness_Explanation": s_3_val,
            "Response_Relevance_Score": s_4_key_val,
            "Response_Relevance_Explanation": s_4_val,
            "Response_Fluency_Score": s_6_key_val,
            "Response_Fluency_Explanation": s_6_val,
            "Response_Tonality_Score": s_5_key_val,
            "Response_Tonality_Explanation": s_5_val,
            "Guideline_Adherence_Score": s_8_key_val,
            "Guideline_Adherence_Explanation": s_8_val,
            "Response_Match_Score": s_7_key_val
            # Add other evaluations similarly
        }
    
        return results

    The above method parsed the initial data from UpTrain before sending it to OpenAI for a better summary without changing any text returned by it.

    @app.route('/evaluate', methods=['POST'])
    def evaluate():
        data = request.json
    
        if not data:
            return {jsonify({'error': 'No data provided'}), 400}
    
        # Extracting input data for processing (just an example of logging received data)
        question = data.get('question', '')
        context = data.get('context', '')
        llmResponse = ''
        personaResponse = data.get('personaResponse', '')
        guideline = data.get('guideline', '')
        groundTruth = data.get('groundTruth', '')
        evaluationMethod = data.get('evaluationMethod', '')
    
        print('question:')
        print(question)
    
        llmResponse = askFeluda(context, question)
        print('='*200)
        print('Response from Feluda::')
        print(llmResponse)
        print('='*200)
    
        # Getting Context LLM
        cLLM = evalContextRelevance(question, context, llmResponse, personaResponse)
    
        print('&'*200)
        print('cLLM:')
        print(cLLM)
        print(type(cLLM))
        print('&'*200)
    
        results = extractPrintedData(cLLM)
    
        print('JSON::')
        print(results)
    
        resJson = jsonify(results)
    
        return resJson

    The above function is the main method, which first receives all the input parameters from the react app & then invokes one-by-one functions to get the LLM response, and LLM performance & finally summarizes them before sending it to react-app.

    For any other scripts, please refer to the above-mentioned GitHub link.


    Let us see some of the screenshots of the test run –


    So, we’ve done it.

    I’ll bring some more exciting topics in the coming days from the Python verse.

    Till then, Happy Avenging! 🙂

    Building a real-time streamlit app by consuming events from Ably channels

    I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.

    One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.

    However, I would like to share the run before we dig deep into this.


    Demo

    Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.

    Let’s explore the broad-level architecture/flow –

    As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.

    Let us understand the sample packages that are required for this task.

    pip install ably==2.0.3
    pip install numpy==1.26.3
    pip install pandas==2.2.0
    pip install plotly==5.19.0
    pip install requests==2.31.0
    pip install streamlit==1.30.0
    pip install streamlit-autorefresh==1.0.1
    pip install streamlit-echarts==0.4.0

    Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.

    1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)

    Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

    def createHumidityGauge(humidity_value):
        fig = go.Figure(go.Indicator(
            mode = "gauge+number",
            value = humidity_value,
            domain = {'x': [0, 1], 'y': [0, 1]},
            title = {'text': "Humidity", 'font': {'size': 24}},
            gauge = {
                'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
                'bar': {'color': "darkblue"},
                'bgcolor': "white",
                'borderwidth': 2,
                'bordercolor': "gray",
                'steps': [
                    {'range': [0, 50], 'color': 'cyan'},
                    {'range': [50, 100], 'color': 'royalblue'}],
                'threshold': {
                    'line': {'color': "red", 'width': 4},
                    'thickness': 0.75,
                    'value': humidity_value}
            }
        ))
    
        fig.update_layout(height=220, paper_bgcolor = "white", font = {'color': "darkblue", 'family': "Arial"}, margin=dict(t=0, l=5, r=5, b=0))
    
        return fig

    The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.

    This code defines a function createHumidityGauge that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:

    1. Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
    2. Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
    3. Setting Gauge Properties:
      • The value is set to the humidity_value parameter, so the gauge shows this humidity level.
      • The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
      • The title is set to “Humidity” with a font size of 24, labeling the gauge.
      • The gauge section defines the appearance and behavior of the gauge, including:
        • An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
        • The color and style of the gauge’s bar and background.
        • Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
        • A threshold line that appears at the value of the humidity, marked in red to stand out.
    4. Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
    5. Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.

    Other similar functions will repeat the same steps.

    def createTemperatureLineChart(data):
        # Assuming 'data' is a DataFrame with a 'Timestamp' index and a 'Temperature' column
        fig = px.line(data, x=data.index, y='Temperature', title='Temperature Vs Time')
        fig.update_layout(height=270)  # Specify the desired height here
        return fig

    The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.

    This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:

    1. Function Definition: It starts with defining a function named createTemperatureLineChart that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
    2. Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
      • The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
      • The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
      • The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
    3. Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
    4. Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.

    Similar functions will repeat for other KPIs.

        st.sidebar.header("KPIs")
        selected_kpis = st.sidebar.multiselect(
            "Select KPIs", options=["Temperature", "Humidity", "Pressure"], default=["Temperature"]
        )

    The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).

    # Split the layout into columns for KPIs and graphs
        gauge_col, kpi_col, graph_col = st.columns(3)
    
        # Auto-refresh setup
        st_autorefresh(interval=7000, key='data_refresh')
    
        # Fetching real-time data
        data = getData(var1, DInd)
    
        st.markdown(
            """
            <style>
            .stEcharts { margin-bottom: -50px; }  /* Class might differ, inspect the HTML to find the correct class name */
            </style>
            """,
            unsafe_allow_html=True
        )
    
        # Display gauges at the top of the page
        gauges = st.container()
    
        with gauges:
            col1, col2, col3 = st.columns(3)
            with col1:
                humidity_value = round(data['Humidity'].iloc[-1], 2)
                humidity_gauge_fig = createHumidityGauge(humidity_value)
                st.plotly_chart(humidity_gauge_fig, use_container_width=True)
    
            with col2:
                temp_value = round(data['Temperature'].iloc[-1], 2)
                temp_gauge_fig = createTempGauge(temp_value)
                st.plotly_chart(temp_gauge_fig, use_container_width=True)
    
            with col3:
                pressure_value = round(data['Pressure'].iloc[-1], 2)
                pressure_gauge_fig = createPressureGauge(pressure_value)
                st.plotly_chart(pressure_gauge_fig, use_container_width=True)
    
    
        # Next row for actual readings and charts side-by-side
        readings_charts = st.container()
    
    
        # Display KPIs and their trends
        with readings_charts:
            readings_col, graph_col = st.columns([1, 2])
    
            with readings_col:
                st.subheader("Latest Readings")
                if "Temperature" in selected_kpis:
                    st.metric("Temperature", f"{temp_value:.2f}%")
    
                if "Humidity" in selected_kpis:
                    st.metric("Humidity", f"{humidity_value:.2f}%")
    
                if "Pressure" in selected_kpis:
                    st.metric("Pressure", f"{pressure_value:.2f}%")
    
    
            # Graph placeholders for each KPI
            with graph_col:
                if "Temperature" in selected_kpis:
                    temperature_fig = createTemperatureLineChart(data.set_index("Timestamp"))
    
                    # Display the Plotly chart in Streamlit with specified dimensions
                    st.plotly_chart(temperature_fig, use_container_width=True)
    
                if "Humidity" in selected_kpis:
                    humidity_fig = createHumidityLineChart(data.set_index("Timestamp"))
    
                    # Display the Plotly chart in Streamlit with specified dimensions
                    st.plotly_chart(humidity_fig, use_container_width=True)
    
                if "Pressure" in selected_kpis:
                    pressure_fig = createPressureLineChart(data.set_index("Timestamp"))
    
                    # Display the Plotly chart in Streamlit with specified dimensions
                    st.plotly_chart(pressure_fig, use_container_width=True)
    1. The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
    2. It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
    3. Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
    4. A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
    5. A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
    6. Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
    7. Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
    8. The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
    9. In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
    10. This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.

    Let us understand some of the important screenshots of this application –


    So, we’ve done it.

    I’ll bring some more exciting topics in the coming days from the Python verse.

    Till then, Happy Avenging! 🙂

    Text2SQL Data Extractor (T2SDE) using Python & Open AI LLM

    Today, I will share a new post that will contextualize the source files & then read the data into the pandas data frame, and then dynamically create the SQL & execute it. Then, fetch the data from the sources based on the query generated dynamically. This project is for the advanced Python developer and data Science Newbie.

    In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

    Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

    Demo

    Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.


    The application will take the metadata captured from source data dynamically. It blends the metadata and enhances the prompt to pass to the Flask server. The Flask server has all the limits of contexts.

    Once the application receives the correct generated SQL, it will then apply the SQL using the SQLAlchemy package to get the desired results.

    The following are the important packages that are essential to this project –

    pip install openai==1.6.1
    pip install pandas==2.1.4
    pip install Flask==3.0.0
    pip install SQLAlchemy==2.0.23

    We’ll have both the server and the main application. Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

    • 1_invokeSQLServer.py (This is the main calling Python script to invoke the OpenAI-Server.)

    Please find some of the key snippet from this discussion –

    @app.route('/message', methods=['POST'])
    def message():
        input_text = request.json.get('input_text', None)
        session_id = request.json.get('session_id', None)
    
        print('*' * 240)
        print('User Input:')
        print(str(input_text))
        print('*' * 240)
    
        # Retrieve conversation history from the session or database
        conversation_history = session.get(session_id, [])
    
        # Add the new message to the conversation history
        conversation_history.append(input_text)
    
        # Call OpenAI API with the updated conversation
        response = client.with_options(max_retries=0).chat.completions.create(
            messages=[
                {
                    "role": "user",
                    "content": input_text,
                }
            ],
            model=cf.conf['MODEL_NAME'],
        )
    
        # Extract the content from the first choice's message
        chat_response = response.choices[0].message.content
        print('*' * 240)
        print('Resposne::')
        print(chat_response)
        print('*' * 240)
    
        conversation_history.append(chat_response)
    
        # Store the updated conversation history in the session or database
        session[session_id] = conversation_history
    
        return chat_response

    This code defines a web application route that handles POST requests sent to the /message endpoint:

    1. Route Declaration: The @app.route('/message', methods=['POST']) part specifies that the function message() is executed when the server receives a POST request at the /message URL.
    2. Function Definition: Inside the message() function:
      • It retrieves two pieces of data from the request’s JSON body: input_text (the user’s input message) and session_id (a unique identifier for the user’s session).
      • It prints the user’s input message, surrounded by lines of asterisks for emphasis.
    3. Conversation History Management:
      • The code retrieves the conversation history associated with the given session_id. This history is a list of messages.
      • It then adds the new user message (input_text) to this conversation history.
    4. OpenAI API Call:
      • The function makes a call to the OpenAI API, passing the user’s message. It specifies not to retry the request if it fails (max_retries=0).
      • The model used for the OpenAI API call is taken from some configurations (cf.conf['MODEL_NAME']).
    5. Processing API Response:
      • The response from the OpenAI API is processed to extract the content of the chat response.
      • This chat response is printed.
    6. Updating Conversation History:
      • The chat response is added to the conversation history.
      • The updated conversation history is then stored back in the session or database, associated with the session_id.
    7. Returning the Response: Finally, the function returns the chat response.

    • clsDynamicSQLProcess.py (This Python class generates the SQL & then executes the flask server to invoke the OpenAI-Server.)

    Now, let us understand the few important piece of snippet –

    def text2SQLBegin(self, DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd='N'):
    
            question = srcQueryPrompt
            create_table_statement = ''
            jStr = ''
    
            print('DBFileNameList::', DBFileNameList)
            print('prevSessionDBFileNameList::', self.prevSessionDBFileNameList)
    
            if set(self.prevSessionDBFileNameList) == set(DBFileNameList):
                self.flag = 'Y'
            else:
                self.flag = 'N'
    
            if self.flag == 'N':
    
                for i in DBFileNameList:
                    DBFileName = i
    
                    FullDBname = fileDBPath + DBFileName
                    print('File: ', str(FullDBname))
    
                    tabName, _ = DBFileName.split('.')
    
                    # Reading the source data
                    df = pd.read_csv(FullDBname)
    
                    # Convert all string columns to lowercase
                    df = df.apply(lambda x: x.str.lower() if x.dtype == "object" else x)
    
                    # Convert DataFrame to SQL table
                    df.to_sql(tabName, con=engine, index=False)
    
                    # Create a MetaData object and reflect the existing database
                    metadata = MetaData()
                    metadata.reflect(bind=engine)
    
                    # Access the 'users' table from the reflected metadata
                    table = metadata.tables[tabName]
    
                    # Generate the CREATE TABLE statement
                    create_table_statement = create_table_statement + str(CreateTable(table)) + '; \n'
    
                    tabName = ''
    
                for joinS in joinCond:
                    jStr = jStr + joinS + '\n'
    
                self.prevSessionDBFileNameList = DBFileNameList
                self.prev_create_table_statement = create_table_statement
    
                masterSessionDBFileNameList = self.prevSessionDBFileNameList
                mast_create_table_statement = self.prev_create_table_statement
    
            else:
                masterSessionDBFileNameList = self.prevSessionDBFileNameList
                mast_create_table_statement = self.prev_create_table_statement
    
            inputPrompt = (templateVal_1 + mast_create_table_statement + jStr + templateVal_2).format(question=question)
    
            if debugInd == 'Y':
                print('INPUT PROMPT::')
                print(inputPrompt)
    
            print('*' * 240)
            print('Find the Generated SQL:')
            print()
    
            DBFileNameList = []
            create_table_statement = ''
    
            return inputPrompt
    1. Function Overview: The text2SQLBegin function processes a list of database file names (DBFileNameList), a file path (fileDBPath), a query prompt (srcQueryPrompt), join conditions (joinCond), and a debug indicator (debugInd) to generate SQL commands.
    2. Initial Setup: It starts by initializing variables for the question, the SQL table creation statement, and a string for join conditions.
    3. Debug Prints: The function prints the current and previous session database file names for debugging purposes.
    4. Flag Setting: A flag is set to ‘Y’ if the current session’s database file names match the previous session’s; otherwise, it’s set to ‘N’.
    5. Processing New Session Data: If the flag is ‘N’, indicating new session data:
      • For each database file, it reads the data, converts string columns to lowercase, and creates a corresponding SQL table in a database using the pandas library.
      • Metadata is generated for each table and a CREATE TABLE SQL statement is created.
    6. Join Conditions and Statement Aggregation: Join conditions are concatenated, and previous session information is updated with the current session’s data.
    7. Handling Repeated Sessions: If the session data is repeated (flag is ‘Y’), it uses the previous session’s SQL table creation statements and database file names.
    8. Final Input Prompt Creation: It constructs the final input prompt by combining template values with the create table statement, join conditions, and the original question.
    9. Debug Printing: If debug mode is enabled, it prints the final input prompt.
    10. Conclusion: The function clears the DBFileNameList and create_table_statement variables, and returns the constructed input prompt.
      def text2SQLEnd(self, srcContext, debugInd='N'):
          url = self.url
    
          payload = json.dumps({"input_text": srcContext,"session_id": ""})
          headers = {'Content-Type': 'application/json', 'Cookie': cf.conf['HEADER_TOKEN']}
    
          response = requests.request("POST", url, headers=headers, data=payload)
    
          return response.text

    The text2SQLEnd function sends an HTTP POST request to a specified URL and returns the response. It takes two parameters: srcContext which contains the input text, and an optional debugInd for debugging purposes. The function constructs the request payload by converting the input text and an empty session ID to JSON format. It sets the request headers, including a content type of ‘application/json’ and a token from the configuration file. The function then sends the POST request using the requests library and returns the text content of the response.

      def sql2Data(self, srcSQL):
          # Executing the query on top of your data
          resultSQL = pd.read_sql_query(srcSQL, con=engine)
    
          return resultSQL

    The sql2Data function is designed to execute a SQL query on a database and return the result. It takes a single parameter, srcSQL, which contains the SQL query to be executed. The function uses the pandas library to run the provided SQL query (srcSQL) against a database connection (engine). It then returns the result of this query, which is typically a DataFrame object containing the data retrieved from the database.

    def genData(self, srcQueryPrompt, fileDBPath, DBFileNameList, joinCond, debugInd='N'):
        try:
            authorName = self.authorName
            website = self.website
            var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    
            print('*' * 240)
            print('SQL Start Time: ' + str(var))
            print('*' * 240)
    
            print('*' * 240)
            print()
    
            if debugInd == 'Y':
                print('Author Name: ', authorName)
                print('For more information, please visit the following Website: ', website)
                print()
    
                print('*' * 240)
            print('Your Data for Retrieval:')
            print('*' * 240)
    
            if debugInd == 'Y':
    
                print()
                print('Converted File to Dataframe Sample:')
                print()
    
            else:
                print()
    
            context = self.text2SQLBegin(DBFileNameList, fileDBPath, srcQueryPrompt, joinCond, debugInd)
            srcSQL = self.text2SQLEnd(context, debugInd)
    
            print(srcSQL)
            print('*' * 240)
            print()
            resDF = self.sql2Data(srcSQL)
    
            print('*' * 240)
            print('SQL End Time: ' + str(var))
            print('*' * 240)
    
            return resDF
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            df = pd.DataFrame()
    
            return df
    1. Initialization and Debug Information: The function begins by initializing variables like authorName, website, and a timestamp (var). It then prints the start time of the SQL process. If the debug indicator (debugInd) is ‘Y’, it prints additional information like the author’s name and website.
    2. Generating SQL Context and Query: The function calls text2SQLBegin with various parameters (file paths, database file names, query prompt, join conditions, and the debug indicator) to generate an SQL context. Then it calls text2SQLEnd with this context and the debug indicator to generate the actual SQL query.
    3. Executing the SQL Query: It prints the generated SQL query for visibility, especially in debug mode. The query is then executed by calling sql2Data, which returns the result as a data frame (resDF).
    4. Finalization and Error Handling: After executing the query, it prints the SQL end time. In case of any exceptions during the process, it catches the error, prints it, and returns an empty DataFrame.
    5. Return Value: The function returns the DataFrame (resDF) containing the results of the executed SQL query. If an error occurs, it returns an empty DataFrame instead.

    Let us explore the directory structure starting from the parent to some of the important child folder should look like this –

    Let us understand the important screenshots of this entire process –


    So, finally, we’ve done it.

    You will get the complete codebase in the following GitHub link.

    I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

    Till then, Happy Avenging! 🙂

    Validating source data against RAG-response using Open AI, GloVe, FAISS using Python

    Today, I’ll be presenting another exciting capability of architecture in the world of LLMs, where you need to answer one crucial point & that is how valid the response generated by these LLMs is against your data. This response is critical when discussing business growth & need to take the right action at the right time.

    Why not view the demo before going through it?

    Demo

    Isn’t it exciting? Great! Let us understand this in detail.

    The first dotted box (extreme-left) represents the area that talks about the data ingestion from different sources, including third-party PDFs. It is expected that organizations should have ready-to-digest data sources. Examples: Data Lake, Data Mart, One Lake, or any other equivalent platforms. Those PDFs will provide additional insights beyond the conventional advanced analytics.

    You need to have some kind of OCR solution that will extract all the relevant information in the form of text from the documents. 

    The next important part is how you define the chunking & embedding of data chunks into Vector DB. Chunking & indexing strategies, along with the overlapping chain, play a crucial importance in tying that segregated piece of context into a single context that will be fed into the source for your preferred LLMs.

    This system employs a vector similarity search to browse through unstructured information and concurrently accesses the database to retrieve the context, ensuring that the responses are not only comprehensive but also anchored in validated knowledge.

    This approach is particularly vital for addressing multi-hop questions, where a single query can be broken down into multiple sub-questions and may require information from numerous documents to generate an accurate answer.


    pip install openai==0.27.8
    pip install pandas==2.0.3
    pip install tensorflow==2.11.1
    pip install faiss-cpu==1.7.4
    pip install gensim==4.3.2

    Let us understand the key class & snippets.

    • clsFeedVectorDB.py (This is the main class that will invoke the Faiss framework to contextualize the docs inside the vector DB with the source file name to validate the answer from Gen AI using Globe.6B embedding models.)

    Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

    # Sample function to convert text to a vector
    def text2Vector(self, text):
        # Encode the text using the tokenizer
        words = [word for word in text.lower().split() if word in self.model]
    
        # If no words in the model, return a zero vector
        if not words:
            return np.zeros(self.model.vector_size)
    
        # Compute the average of the word vectors
        vector = np.mean([self.model[word] for word in words], axis=0)
        return vector.reshape(1, -1)

    This code is for a function called “text2Vector” that takes some text as input and converts it into a numerical vector. Let me break it down step by step:

    • It starts by taking some text as input, and this text is expected to be a sentence or a piece of text.
    • The text is then split into individual words, and each word is converted to lowercase.
    • It checks if each word is present in a pre-trained language model (probably a word embedding model like Word2Vec or GloVe). If a word is not in the model, it’s ignored.
    • If none of the words from the input text are found in the model, the function returns a vector filled with zeros. This vector has the same size as the word vectors in the model.
    • If there are words from the input text in the model, the function calculates the average vector of these words. It does this by taking the word vectors for each word found in the model and computing their mean (average). This results in a single vector that represents the input text.
    • Finally, the function reshapes this vector into a 2D array with one row and as many columns as there are elements in the vector. The reason for this reshaping is often related to compatibility with other parts of the code or libraries used in the project.

    So, in simple terms, this function takes a piece of text, looks up the word vectors for the words in that text, and calculates the average of those vectors to create a single numerical representation of the text. If none of the words are found in the model, it returns a vector of zeros.

        def genData(self):
            try:
                basePath = self.basePath
                modelFileName = self.modelFileName
                vectorDBPath = self.vectorDBPath
                vectorDBFileName = self.vectorDBFileName
    
                # Create a FAISS index
                dimension = int(cf.conf['NO_OF_MODEL_DIM'])  # Assuming 100-dimensional vectors 
                index = faiss.IndexFlatL2(dimension)
    
                print('*' * 240)
                print('Vector Index Your Data for Retrieval:')
                print('*' * 240)
    
                FullVectorDBname = vectorDBPath + vectorDBFileName
                indexFile = str(vectorDBPath) + str(vectorDBFileName) + '.index'
    
                print('File: ', str(indexFile))
    
                data = {}
                # List all files in the specified directory
                files = os.listdir(basePath)
    
                # Filter out files that are not text files
                text_files = [file for file in files if file.endswith('.txt')]
    
                # Read each text file
                for file in text_files:
                    file_path = os.path.join(basePath, file)
                    print('*' * 240)
                    print('Processing File:')
                    print(str(file_path))
                    try:
                        # Attempt to open with utf-8 encoding
                        with open(file_path, 'r', encoding='utf-8') as file:
                            for line_number, line in enumerate(file, start=1):
                                # Assume each line is a separate document
                                vector = self.text2Vector(line)
                                vector = vector.reshape(-1)
                                index_id = index.ntotal
    
                                index.add(np.array([vector]))  # Adding the vector to the index
                                data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                    except UnicodeDecodeError:
                        # If utf-8 fails, try a different encoding
                        try:
                            with open(file_path, 'r', encoding='ISO-8859-1') as file:
                                for line_number, line in enumerate(file, start=1):
                                    # Assume each line is a separate document
                                    vector = self.text2Vector(line)
                                    vector = vector.reshape(-1)
                                    index_id = index.ntotal
                                    index.add(np.array([vector]))  # Adding the vector to the index
                                    data[index_id] = {'text': line, 'line_number': line_number, 'file_name': file_path}  # Storing the line and file name
                        except Exception as e:
                            print(f"Could not read file {file}: {e}")
                            continue
    
                    print('*' * 240)
    
                # Save the data dictionary using pickle
                dataCache = vectorDBPath + modelFileName
                with open(dataCache, 'wb') as f:
                    pickle.dump(data, f)
    
                # Save the index and data for later use
                faiss.write_index(index, indexFile)
    
                print('*' * 240)
    
                return 0
    
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return 1
    • This code defines a function called genData, and its purpose is to prepare and store data for later retrieval using a FAISS index. Let’s break down what it does step by step:
    • It starts by assigning several variables, such as basePath, modelFileName, vectorDBPath, and vectorDBFileName. These variables likely contain file paths and configuration settings.
    • It creates a FAISS index with a specified dimension (assuming 100-dimensional vectors in this case) using faiss.IndexFlatL2. FAISS is a library for efficient similarity search and clustering of high-dimensional data.
    • It prints the file name and lines where the index will be stored. It initializes an empty dictionary called data to store information about the processed text data.
    • It lists all the files in a directory specified by basePath. It filters out only the files that have a “.txt” extension as text files.
    • It then reads each of these text files one by one. For each file:
    1. It attempts to open the file with UTF-8 encoding.
      • It reads the file line by line.
      • For each line, it calls a function text2Vector to convert the text into a numerical vector representation. This vector is added to the FAISS index.
      • It also stores some information about the line, such as the line number and the file name, in the data dictionary.
      • If there is an issue with UTF-8 encoding, it tries to open the file with a different encoding, “ISO-8859-1”. The same process of reading and storing data continues.
    • If there are any exceptions (errors) during this process, it prints an error message but continues processing other files.
    • Once all the files are processed, it saves the data dictionary using the pickle library to a file specified by dataCache.
    • It also saves the FAISS index to a file specified by indexFile.
    • Finally, it returns 0 if the process completes successfully or 1 if there was an error during execution.

    In summary, this function reads text files, converts their contents into numerical vectors, and builds a FAISS index for efficient similarity search. It also saves the processed data and the index for later use. If there are any issues during the process, it prints error messages but continues processing other files.

    • clsRAGOpenAI.py (This is the main class that will invoke the RAG class, which will get the contexts with references including source files, line numbers, and source texts. This will help the customer to validate the source against the OpenAI response to understand & control the data bias & other potential critical issues.)

    Let us understand some of the key snippets from the above script (Full scripts will be available in the GitHub Repo) –

    def ragAnswerWithHaystackAndGPT3(self, queryVector, k, question):
        modelName = self.modelName
        maxToken = self.maxToken
        temp = self.temp
    
        # Assuming getTopKContexts is a method that returns the top K contexts
        contexts = self.getTopKContexts(queryVector, k)
        messages = []
    
        # Add contexts as system messages
        for file_name, line_number, text in contexts:
            messages.append({"role": "system", "content": f"Document: {file_name} \nLine Number: {line_number} \nContent: {text}"})
    
        prompt = self.generateOpenaiPrompt(queryVector, k)
        prompt = prompt + "Question: " + str(question) + ". \n Answer based on the above documents."
    
        # Add user question
        messages.append({"role": "user", "content": prompt})
    
        # Create chat completion
        completion = client.chat.completions.create(
        model=modelName,
        messages=messages,
        temperature = temp,
        max_tokens = maxToken
        )
    
        # Assuming the last message in the response is the answer
        last_response = completion.choices[0].message.content
        source_refernces = ['FileName: ' + str(context[0]) + ' - Line Numbers: ' + str(context[1]) + ' - Source Text (Reference): ' + str(context[2]) for context in contexts]
    
        return last_response, source_refernces
    • This code defines a function called ragAnswerWithHaystackAndGPT3. Its purpose is to use a combination of the Haystack search method and OpenAI’s GPT-3 model to generate an answer to a user’s question. Let’s break down what it does step by step:
    • It starts by assigning several variables, such as modelName, maxToken, and temp. These variables likely contain model-specific information and settings for GPT-3.
    • It calls a method getTopKContexts to retrieve the top K contexts (which are likely documents or pieces of text) related to the user’s query. These contexts are stored in the contexts variable.
    • It initializes an empty list called messages to store messages that will be used in the conversation with the GPT-3 model.
    • It iterates through each context and adds them as system messages to the messages list. These system messages provide information about the documents or sources being used in the conversation.
    • It creates a prompt that combines the query, retrieved contexts, and the user’s question. This prompt is then added as a user message to the messages list. It effectively sets up the conversation for GPT-3, where the user’s question is followed by context.
    • It makes a request to the GPT-3 model using the client.chat.completions.create method, passing in the model name, the constructed messages, and other settings such as temperature and maximum tokens.
    • After receiving a response from GPT-3, it assumes that the last message in the response contains the answer generated by the model.
    • It also constructs source_references, which is a list of references to the documents or sources used in generating the answer. This information includes the file name, line numbers, and source text for each context.
    • Finally, it returns the generated answer (last_response) and the source references to the caller.

    In summary, this function takes a user’s query, retrieves relevant contexts or documents, sets up a conversation with GPT-3 that includes the query and contexts, and then uses GPT-3 to generate an answer. It also provides references to the sources used in generating the answer.

        def getTopKContexts(self, queryVector, k):
            try:
                distances, indices = index.search(queryVector, k)
                resDict = [(data[i]['file_name'], data[i]['line_number'], data[i]['text']) for i in indices[0]]
                return resDict
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return x

    This code defines a function called getTopKContexts. Its purpose is to retrieve the top K relevant contexts or pieces of information from a pre-built index based on a query vector. Here’s a breakdown of what it does:

    1. It takes two parameters as input: queryVector, which is a numerical vector representing a query, and k, which specifies how many relevant contexts to retrieve.
    2. Inside a try-except block, it attempts the following steps:
      • It uses the index.search method to find the top K closest contexts to the given queryVector. This method returns two arrays: distances (measuring how similar the contexts are to the query) and indices (indicating the positions of the closest contexts in the data).
      • It creates a list called “resDict", which contains tuples for each of the top K contexts. Each tuple contains three pieces of information: the file name (file_name), the line number (line_number), and the text content (text) of the context. These details are extracted from a data dictionary.
    3. If the process completes successfully, it returns the list of top K contexts (resDict) to the caller.
    4. If there’s an exception (an error) during this process, it captures the error message as a string (x), prints the error message, and then returns the error message itself.

    In summary, this function takes a query vector and finds the K most relevant contexts or pieces of information based on their similarity to the query. It returns these contexts as a list of tuples containing file names, line numbers, and text content. If there’s an error, it prints an error message and returns the error message string.

    def generateOpenaiPrompt(self, queryVector, k):
        contexts = self.getTopKContexts(queryVector, k)
        template = ct.templateVal_1
        prompt = template
        for file_name, line_number, text in contexts:
            prompt += f"Document: {file_name}\n Line Number: {line_number} \n Content: {text}\n\n"
        return prompt

    This code defines a function called generateOpenaiPrompt. Its purpose is to create a prompt or a piece of text that combines a template with information from the top K relevant contexts retrieved earlier. Let’s break down what it does:

    1. It starts by calling the getTopKContexts function to obtain the top K relevant contexts based on a given queryVector.
    2. It initializes a variable called template with a predefined template value (likely defined elsewhere in the code).
    3. It sets the prompt variable to the initial template.
    4. Then, it enters a loop where it iterates through each of the relevant contexts retrieved earlier (contexts are typically documents or text snippets).
    5. For each context, it appends information to the prompt. Specifically, it adds lines to the prompt that include:
      • The document’s file name (Document: [file_name]).
      • The line number within the document (Line Number: [line_number]).
      • The content of the context itself (Content: [text]).
    6. It adds some extra spacing (newlines) between each context to ensure readability.
    7. Finally, it returns the complete – prompt, which is a combination of the template and information from the relevant contexts.

    In summary, this function takes a query vector, retrieves relevant contexts, and creates a prompt by combining a template with information from these contexts. This prompt can then be used as input for an AI model or system, likely for generating responses or answers based on the provided context.

    Let us understand the directory structure of this entire application –


    To learn more about this package, please visit the following GitHub link.

    So, finally, we’ve done it. I know that this post is relatively smaller than my earlier post. But, I think, you can get a good hack to improve some of your long-running jobs by applying this trick.

    I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

    Till then, Happy Avenging! 🙂

    RAG implementation of LLMs by using Python, Haystack & React (Part – 2)

    Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.

    In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.

    Demo

    Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

    As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.

    Let us understand the flow of events here –

    1. The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
    2. The application will clean the nested data & extract the relevant attributes after flattening the JSON.
    3. It will create the unstructured text-based context, which is later fed to the Vector DB framework.

    pip install farm-haystack==1.19.0
    pip install Flask==2.2.5
    pip install Flask-Cors==4.0.0
    pip install Flask-JWT-Extended==4.5.2
    pip install Flask-Session==0.5.0
    pip install openai==0.27.8
    pip install pandas==2.0.3
    pip install tensorflow==2.11.1

    We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.


    We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.

    We’ll discuss the scripts in the diagram as part of the flow mentioned above.

    • clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
    def genData(self):
        try:
            base_url = self.base_url
            header_token = self.header_token
            basePath = self.basePath
            outputPath = self.outputPath
            mergedFile = self.mergedFile
            subdir = self.subdir
            Ind = self.Ind
            var_1 = datetime.now().strftime("%H.%M.%S")
    
    
            devVal = list()
            objVal = list()
    
            # Main Details
            headers = {'Cookie':header_token}
            payload={}
    
            url = base_url + '/departments'
    
            date_ranges = self.generateFirstDayOfLastTenYears()
    
            # Getting all the departments
            try:
                print('Department URL:')
                print(str(url))
    
                response = requests.request("GET", url, headers=headers, data=payload)
                parsed_data = json.loads(response.text)
    
                print('Department JSON:')
                print(str(parsed_data))
    
                # Extract the "departmentId" values into a Python list
                for dept_det in parsed_data['departments']:
                    for info in dept_det:
                        if info == 'departmentId':
                            devVal.append(dept_det[info])
    
            except Exception as e:
                x = str(e)
                print('Error: ', x)
                devVal = list()
    
            # List to hold thread objects
            threads = []
    
            # Calling the Data using threads
            for dep in devVal:
                t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
                threads.append(t)
                t.start()
    
            # Wait for all threads to complete
            for t in threads:
                t.join()
    
            res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)
    
            if res == 0:
                print('Successful!')
            else:
                print('Failure!')
    
            return 0
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            return 1

    The above code translates into the following steps –

    1. The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
    2. Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
    3. Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
    def generateFirstDayOfLastTenYears(self):
        yearRange = self.yearRange
        date_format = "%Y-%m-%d"
        current_year = datetime.now().year
    
        date_ranges = []
        for year in range(current_year - yearRange, current_year + 1):
            first_day_of_year_full = datetime(year, 1, 1)
            first_day_of_year = first_day_of_year_full.strftime(date_format)
            date_ranges.append(first_day_of_year)
    
        return date_ranges

    The first method will generate the first day of each year for the last ten years, including the current year.

    def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
        try:
            cnt = 0
            cnt_x = 1
            var_1 = datetime.now().strftime("%H.%M.%S")
    
            for x_start_date in date_ranges:
                try:
                    urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)
    
                    print('Nested URL:')
                    print(str(urlM))
    
                    response_obj = requests.request("GET", urlM, headers=headers, data=payload)
                    objectDets = json.loads(response_obj.text)
    
                    for obj_det in objectDets['objectIDs']:
                        objVal.append(obj_det)
    
                    for objId in objVal:
                        urlS = base_url + '/objects/' + str(objId)
    
                        print('Final URL:')
                        print(str(urlS))
    
                        response_det = requests.request("GET", urlS, headers=headers, data=payload)
                        objDetJSON = response_det.text
    
                        retDB = self.createData(objDetJSON)
                        retDB['departmentId'] = str(dep)
    
                        if cnt == 0:
                            df_M = retDB
                        else:
                            d_frames = [df_M, retDB]
                            df_M = pd.concat(d_frames)
    
                        if cnt == 1000:
                            cnt = 0
                            clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
                            cnt_x += 1
                            df_M = pd.DataFrame()
    
                        cnt += 1
    
                except Exception as e:
                    x = str(e)
                    print('Error X:', x)
            return 0
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            return 1

    The above method will invoke the individual API call to fetch the relevant artifact information.

    def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
        try:
            csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
            data_frames = []
    
            for file in csv_files:
                encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
                for encoding in encodings_to_try:
                    try:
                        FullFileName = directory_path + file
                        print('File Name: ', FullFileName)
                        df = pd.read_csv(FullFileName, encoding=encoding)
                        data_frames.append(df)
                        break  # Stop trying other encodings if the reading is successful
                    except UnicodeDecodeError:
                        continue
    
            if not data_frames:
                raise Exception("Unable to read CSV files. Check encoding or file format.")
    
            merged_df = pd.concat(data_frames, ignore_index=True)
    
            merged_full_name = os.path.join(output_path, output_file)
            merged_df.to_csv(merged_full_name, index=False)
    
            for file in csv_files:
                os.remove(os.path.join(directory_path, file))
    
            return 0
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            return 1

    The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).

    For the complete code, please visit the GitHub.

    • 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
    #########################################################
    #### Written By: SATYAKI DE                          ####
    #### Written On: 27-Jun-2023                         ####
    #### Modified On 28-Jun-2023                         ####
    ####                                                 ####
    #### Objective: This is the main calling             ####
    #### python script that will invoke the              ####
    #### shortcut application created inside MAC         ####
    #### enviornment including MacBook, IPad or IPhone.  ####
    ####                                                 ####
    #########################################################
    import datetime
    from clsConfigClient import clsConfigClient as cf
    
    import clsExtractJSON as cej
    
    ########################################################
    ################    Global Area   ######################
    ########################################################
    
    cJSON = cej.clsExtractJSON()
    
    basePath = cf.conf['DATA_PATH']
    outputPath = cf.conf['OUTPUT_PATH']
    mergedFile = cf.conf['MERGED_FILE']
    
    ########################################################
    ################  End Of Global Area   #################
    ########################################################
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    def main():
        try:
            var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            print('*'*120)
            print('Start Time: ' + str(var))
            print('*'*120)
    
            r1 = cJSON.genData()
    
            if r1 == 0:
                print()
                print('Successfully Scrapped!')
            else:
                print()
                print('Failed to Scrappe!')
    
            print('*'*120)
            var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            print('End Time: ' + str(var1))
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
    if __name__ == '__main__':
        main()
    

    The above script calls the main class after instantiating the class.

    • clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
    def createRec(self):
        try:
            basePath = self.basePath
            fileName = self.fileName
            Ind = self.Ind
            subdir = self.subdir
            base_url = self.base_url
            outputPath = self.outputPath
            mergedFile = self.mergedFile
            cleanedFile = self.cleanedFile
    
            FullFileName = outputPath + mergedFile
    
            df = pd.read_csv(FullFileName)
            df2 = df[listCol]
            dfFin = df2.drop_duplicates().reset_index(drop=True)
    
            dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
            dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
            dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])
    
            # Dropping the old Dtype Columns
            dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
            dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
            dfFin.drop(['objectURL'], axis=1, inplace=True)
            dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
            dfFin.drop(['AAT_URL'], axis=1, inplace=True)
            dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
            dfFin.drop(['URL'], axis=1, inplace=True)
    
            # Save the filtered DataFrame to a new CSV file
            #clog.logr(cleanedFile, Ind, dfFin, subdir)
            res = self.addHash(dfFin)
    
            if res == 0:
                print('Added Hash!')
            else:
                print('Failed to add hash!')
    
            # Generate the text for each row in the dataframe
            for _, row in dfFin.iterrows():
                x = self.genPrompt(row)
                self.addDocument(x, cleanedFile)
    
            return documents
    
        except Exception as e:
            x = str(e)
            print('Record Error: ', x)
    
            return documents

    The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.

    Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.

    Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.

    For more details, please visit the GitHub link.

    • 1_1_testCreateRec.py (This is the main class that will call the above class.)
    #########################################################
    #### Written By: SATYAKI DE                          ####
    #### Written On: 27-Jun-2023                         ####
    #### Modified On 28-Jun-2023                         ####
    ####                                                 ####
    #### Objective: This is the main calling             ####
    #### python script that will invoke the              ####
    #### shortcut application created inside MAC         ####
    #### enviornment including MacBook, IPad or IPhone.  ####
    ####                                                 ####
    #########################################################
    
    from clsConfigClient import clsConfigClient as cf
    import clsL as log
    import clsCreateList as ccl
    
    from datetime import datetime, timedelta
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    ###############################################
    ###           Global Section                ###
    ###############################################
    
    #Initiating Logging Instances
    clog = log.clsL()
    cl = ccl.clsCreateList()
    
    var = datetime.now().strftime(".%H.%M.%S")
    
    documents = []
    
    ###############################################
    ###    End of Global Section                ###
    ###############################################
    def main():
        try:
            var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            print('*'*120)
            print('Start Time: ' + str(var))
            print('*'*120)
    
            print('*'*240)
            print('Creating Index store:: ')
            print('*'*240)
    
            documents = cl.createRec()
    
            print('Inserted Sample Records: ')
            print(str(documents))
            print('\n')
    
            r1 = len(documents)
    
            if r1 > 0:
                print()
                print('Successfully Indexed sample records!')
            else:
                print()
                print('Failed to sample Indexed recrods!')
    
            print('*'*120)
            var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
            print('End Time: ' + str(var1))
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
    if __name__ == '__main__':
        main()
    

    The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.

    This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.

    – Satyaki
    • clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
    #########################################################
    #### Written By: SATYAKI DE                          ####
    #### Written On: 27-Jun-2023                         ####
    #### Modified On 28-Sep-2023                         ####
    ####                                                 ####
    #### Objective: This is the main calling             ####
    #### python script that will invoke the              ####
    #### haystack frameowrk to contextulioze the docs    ####
    #### inside the vector DB.                           ####
    ####                                                 ####
    #########################################################
    
    from haystack.document_stores.faiss import FAISSDocumentStore
    from haystack.nodes import DensePassageRetriever
    import openai
    import pandas as pd
    import os
    import clsCreateList as ccl
    
    from clsConfigClient import clsConfigClient as cf
    import clsL as log
    
    from datetime import datetime, timedelta
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    ###############################################
    ###           Global Section                ###
    ###############################################
    
    Ind = cf.conf['DEBUG_IND']
    openAIKey = cf.conf['OPEN_AI_KEY']
    
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    
    #Initiating Logging Instances
    clog = log.clsL()
    cl = ccl.clsCreateList()
    
    var = datetime.now().strftime(".%H.%M.%S")
    
    # Encode your data to create embeddings
    documents = []
    
    var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    print('*'*120)
    print('Start Time: ' + str(var_1))
    print('*'*120)
    
    print('*'*240)
    print('Creating Index store:: ')
    print('*'*240)
    
    documents = cl.createRec()
    
    print('Inserted Sample Records: ')
    print(documents[:5])
    print('\n')
    print('Type:')
    print(type(documents))
    
    r1 = len(documents)
    
    if r1 > 0:
        print()
        print('Successfully Indexed records!')
    else:
        print()
        print('Failed to Indexed recrods!')
    
    print('*'*120)
    var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    print('End Time: ' + str(var_2))
    
    # Passing OpenAI API Key
    openai.api_key = openAIKey
    
    ###############################################
    ###    End of Global Section                ###
    ###############################################
    
    class clsFeedVectorDB:
        def __init__(self):
            self.basePath = cf.conf['DATA_PATH']
            self.modelFileName = cf.conf['CACHE_FILE']
            self.vectorDBPath = cf.conf['VECTORDB_PATH']
            self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
            self.queryModel = cf.conf['QUERY_MODEL']
            self.passageModel = cf.conf['PASSAGE_MODEL']
    
        def retrieveDocuments(self, question, retriever, top_k=3):
            return retriever.retrieve(question, top_k=top_k)
    
        def generateAnswerWithGPT3(self, retrievedDocs, question):
            documents_text = " ".join([doc.content for doc in retrievedDocs])
            prompt = f"Given the following documents: {documents_text}, answer the question: {question}"
    
            response = openai.Completion.create(
                model="text-davinci-003",
                prompt=prompt,
                max_tokens=150
            )
            return response.choices[0].text.strip()
    
        def ragAnswerWithHaystackAndGPT3(self, question, retriever):
            retrievedDocs = self.retrieveDocuments(question, retriever)
            return self.generateAnswerWithGPT3(retrievedDocs, question)
    
        def genData(self, strVal):
            try:
                basePath = self.basePath
                modelFileName = self.modelFileName
                vectorDBPath = self.vectorDBPath
                vectorDBFileName = self.vectorDBFileName
                queryModel = self.queryModel
                passageModel = self.passageModel
    
                print('*'*120)
                print('Index Your Data for Retrieval:')
                print('*'*120)
    
                FullFileName = basePath + modelFileName
                FullVectorDBname = vectorDBPath + vectorDBFileName
    
                sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
                print('Vector DB Path: ', str(sqlite_path))
    
                indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
                indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
    
                print('File: ', str(indexFile))
                print('Config: ', str(indexConfig))
    
                # Initialize DocumentStore
                document_store = FAISSDocumentStore(sql_url=sqlite_path)
    
                libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'
    
                document_store.write_documents(documents)
    
                # Initialize Retriever
                retriever = DensePassageRetriever(document_store=document_store,
                                                  query_embedding_model=queryModel,
                                                  passage_embedding_model=passageModel,
                                                  use_gpu=False)
    
                document_store.update_embeddings(retriever=retriever)
    
                document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")
    
                print('*'*120)
                print('Testing with RAG & OpenAI...')
                print('*'*120)
    
                answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)
    
                print('*'*120)
                print('Testing Answer:: ')
                print(answer)
                print('*'*120)
    
                return 0
    
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return 1
    

    In the above script, the following essential steps took place –

    1. First, the application calls the clsCreateList class to store all the documents inside a dictionary.
    2. Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
    3. Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.

    Here is a short clip of how the RAG models contextualize with the source data.

    RAG-Model Contextualization

    So, finally, we’ve done it.

    I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

    You will get the complete codebase in the following GitHub link.

    I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

    Till then, Happy Avenging! 🙂

    RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

    Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

    In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

    Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

    Demo

    Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

    As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

    Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

    Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

    The following are the important packages that are essential to this project –

    pip install farm-haystack==1.19.0
    pip install Flask==2.2.5
    pip install Flask-Cors==4.0.0
    pip install Flask-JWT-Extended==4.5.2
    pip install Flask-Session==0.5.0
    pip install openai==0.27.8
    pip install pandas==2.0.3
    pip install tensorflow==2.11.1

    We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

    Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

    • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
    #########################################################
    #### Written By: SATYAKI DE                          ####
    #### Written On: 27-Jun-2023                         ####
    #### Modified On 28-Jun-2023                         ####
    ####                                                 ####
    #### Objective: This is the main calling             ####
    #### python script that will invoke the              ####
    #### shortcut application created inside MAC         ####
    #### enviornment including MacBook, IPad or IPhone.  ####
    ####                                                 ####
    #########################################################
    
    from flask import Flask, jsonify, request, session
    from flask_cors import CORS
    from werkzeug.security import check_password_hash, generate_password_hash
    from flask_jwt_extended import JWTManager, jwt_required, create_access_token
    import pandas as pd
    from clsConfigClient import clsConfigClient as cf
    import clsL as log
    import clsContentScrapper as csc
    import clsRAGOpenAI as crao
    import csv
    from datetime import timedelta
    import os
    import re
    import json
    
    ########################################################
    ################    Global Area   ######################
    ########################################################
    #Initiating Logging Instances
    clog = log.clsL()
    
    admin_key = cf.conf['ADMIN_KEY']
    secret_key = cf.conf['SECRET_KEY']
    session_path = cf.conf['SESSION_PATH']
    sessionFile = cf.conf['SESSION_CACHE_FILE']
    
    app = Flask(__name__)
    CORS(app)  # This will enable CORS for all routes
    app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
    app.secret_key = secret_key
    
    jwt = JWTManager(app)
    
    users = cf.conf['USER_NM']
    passwd = cf.conf['USER_PWD']
    
    cCScrapper = csc.clsContentScrapper()
    cr = crao.clsRAGOpenAI()
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    # Define the aggregation functions
    def join_unique(series):
        unique_vals = series.drop_duplicates().astype(str)
        return ', '.join(filter(lambda x: x != 'nan', unique_vals))
    
    # Building the preaggregate cache
    def groupImageWiki():
        try:
            base_path = cf.conf['OUTPUT_PATH']
            inputFile = cf.conf['CLEANED_FILE']
            outputFile = cf.conf['CLEANED_FILE_SHORT']
            subdir = cf.conf['SUBDIR_OUT']
            Ind = cf.conf['DEBUG_IND']
    
            inputCleanedFileLookUp = base_path + inputFile
    
            #Opening the file in dataframe
            df = pd.read_csv(inputCleanedFileLookUp)
            hash_values = df['Total_Hash'].unique()
    
            dFin = df[['primaryImage','Wiki_URL','Total_Hash']]
    
            # Ensure columns are strings and not NaN
            # Convert columns to string and replace 'nan' with an empty string
            dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
            dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')
    
            dFin.drop_duplicates()
    
            # Group by 'Total_Hash' and aggregate
            dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()
    
            return dfAgg
    
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            df = pd.DataFrame()
    
            return df
    
    resDf = groupImageWiki()
    
    ########################################################
    ################  End  Global Area  ####################
    ########################################################
    
    def extractRemoveUrls(hash_value):
        image_urls = ''
        wiki_urls = ''
        # Parse the inner message JSON string
        try:
    
            resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
            filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]
    
            if not filtered_df.empty:
                image_urls = filtered_df['primaryImage'].values[0]
                wiki_urls = filtered_df['Wiki_URL'].values[0]
    
            return image_urls, wiki_urls
    
        except Exception as e:
            x = str(e)
            print('extractRemoveUrls Error: ', x)
            return image_urls, wiki_urls
    
    def isIncomplete(line):
        """Check if a line appears to be incomplete."""
    
        # Check if the line ends with certain patterns indicating it might be incomplete.
        incomplete_patterns = [': [Link](', ': Approximately ', ': ']
        return any(line.endswith(pattern) for pattern in incomplete_patterns)
    
    def filterData(data):
        """Return only the complete lines from the data."""
    
        lines = data.split('\n')
        complete_lines = [line for line in lines if not isIncomplete(line)]
    
        return '\n'.join(complete_lines)
    
    def updateCounter(sessionFile):
        try:
            counter = 0
    
            # Check if the CSV file exists
            if os.path.exists(sessionFile):
                with open(sessionFile, 'r') as f:
                    reader = csv.reader(f)
                    for row in reader:
                        # Assuming the counter is the first value in the CSV
                        counter = int(row[0])
    
            # Increment counter
            counter += 1
    
            # Write counter back to CSV
            with open(sessionFile, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([counter])
    
            return counter
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            return 1
    
    def getPreviousResult():
        try:
            fullFileName = session_path + sessionFile
            newCounterValue = updateCounter(fullFileName)
    
            return newCounterValue
        except Exception as e:
            x = str(e)
            print('Error: ', x)
    
            return 1
    
    @app.route('/login', methods=['POST'])
    def login():
        username = request.json.get('username', None)
        password = request.json.get('password', None)
    
        print('User Name: ', str(username))
        print('Password: ', str(password))
    
        #if username not in users or not check_password_hash(users.get(username), password):
        if ((username not in users) or (password not in passwd)):
            return jsonify({'login': False}), 401
    
        access_token = create_access_token(identity=username)
        return jsonify(access_token=access_token)
    
    @app.route('/chat', methods=['POST'])
    def get_chat():
        try:
            #session["key"] = "1D98KI"
            #session_id = session.sid
            #print('Session Id: ', str(session_id))
    
            cnt = getPreviousResult()
            print('Running Session Count: ', str(cnt))
    
            username = request.json.get('username', None)
            message = request.json.get('message', None)
    
            print('User: ', str(username))
            print('Content: ', str(message))
    
            if cnt == 1:
                retList = cCScrapper.extractCatalog()
            else:
                hashValue, cleanedData = cr.getData(str(message))
                print('Main Hash Value:', str(hashValue))
    
                imageUrls, wikiUrls = extractRemoveUrls(hashValue)
                print('Image URLs: ', str(imageUrls))
                print('Wiki URLs: ', str(wikiUrls))
                print('Clean Text:')
                print(str(cleanedData))
                retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'
    
            response = {
                'message': retList
            }
    
            print('JSON: ', str(response))
            return jsonify(response)
    
        except Exception as e:
            x = str(e)
    
            response = {
                'message': 'Error: ' + x
            }
            return jsonify(response)
    
    @app.route('/api/data', methods=['GET'])
    @jwt_required()
    def get_data():
        response = {
            'message': 'Hello from Flask!'
        }
        return jsonify(response)
    
    if __name__ == '__main__':
        app.run(debug=True)
    

    Let us understand some of the important sections of the above script –

    Function – login():

    The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

    Function – get_chat():

    The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

    Function – updateCounter():

    The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

    Function – extractRemoveUrls():

    The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

    • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
    #####################################################
    #### Written By: SATYAKI DE                      ####
    #### Written On: 27-May-2023                     ####
    #### Modified On 28-May-2023                     ####
    ####                                             ####
    #### Objective: This is the main calling         ####
    #### python class that will invoke the           ####
    #### LangChain of package to extract             ####
    #### the transcript from the YouTube videos &    ####
    #### then answer the questions based on the      ####
    #### topics selected by the users.               ####
    ####                                             ####
    #####################################################
    
    from langchain.document_loaders import YoutubeLoader
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain.embeddings.openai import OpenAIEmbeddings
    from langchain.vectorstores import FAISS
    from langchain.chat_models import ChatOpenAI
    from langchain.chains import LLMChain
    
    from langchain.prompts.chat import (
        ChatPromptTemplate,
        SystemMessagePromptTemplate,
        HumanMessagePromptTemplate,
    )
    
    from googleapiclient.discovery import build
    
    import clsTemplate as ct
    from clsConfigClient import clsConfigClient as cf
    
    import os
    
    from flask import jsonify
    import requests
    
    ###############################################
    ###           Global Section                ###
    ###############################################
    open_ai_Key = cf.conf['OPEN_AI_KEY']
    os.environ["OPENAI_API_KEY"] = open_ai_Key
    embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)
    
    YouTube_Key = cf.conf['YOUTUBE_KEY']
    youtube = build('youtube', 'v3', developerKey=YouTube_Key)
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    ###############################################
    ###    End of Global Section                ###
    ###############################################
    
    class clsContentScrapper:
        def __init__(self):
            self.model_name = cf.conf['MODEL_NAME']
            self.temp_val = cf.conf['TEMP_VAL']
            self.max_cnt = int(cf.conf['MAX_CNT'])
            self.url = cf.conf['BASE_URL']
            self.header_token = cf.conf['HEADER_TOKEN']
    
        def extractCatalog(self):
            try:
                base_url = self.url
                header_token = self.header_token
    
                url = base_url + '/departments'
    
                print('Full URL: ', str(url))
    
                payload={}
                headers = {'Cookie': header_token}
    
                response = requests.request("GET", url, headers=headers, data=payload)
    
                x = response.text
    
                return x
            except Exception as e:
                discussedTopic = []
                x = str(e)
                print('Error: ', x)
    
                return x
    

    Let us understand the the core part that require from this class.

    Function – extractCatalog():

    The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

    • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
    #########################################################
    #### Written By: SATYAKI DE                          ####
    #### Written On: 27-Jun-2023                         ####
    #### Modified On 28-Jun-2023                         ####
    ####                                                 ####
    #### Objective: This is the main calling             ####
    #### python script that will invoke the              ####
    #### shortcut application created inside MAC         ####
    #### enviornment including MacBook, IPad or IPhone.  ####
    ####                                                 ####
    #########################################################
    
    from haystack.document_stores.faiss import FAISSDocumentStore
    from haystack.nodes import DensePassageRetriever
    import openai
    
    from clsConfigClient import clsConfigClient as cf
    import clsL as log
    
    # Disbling Warning
    def warn(*args, **kwargs):
        pass
    
    import warnings
    warnings.warn = warn
    
    import os
    import re
    ###############################################
    ###           Global Section                ###
    ###############################################
    Ind = cf.conf['DEBUG_IND']
    queryModel = cf.conf['QUERY_MODEL']
    passageModel = cf.conf['PASSAGE_MODEL']
    
    #Initiating Logging Instances
    clog = log.clsL()
    
    os.environ["TOKENIZERS_PARALLELISM"] = "false"
    
    vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
    
    indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
    indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
    
    print('File: ', str(indexFile))
    print('Config: ', str(indexConfig))
    
    # Also, provide `config_path` parameter if you set it when calling the `save()` method:
    new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)
    
    # Initialize Retriever
    retriever = DensePassageRetriever(document_store=new_document_store,
                                      query_embedding_model=queryModel,
                                      passage_embedding_model=passageModel,
                                      use_gpu=False)
    
    
    ###############################################
    ###    End of Global Section                ###
    ###############################################
    
    class clsRAGOpenAI:
        def __init__(self):
            self.basePath = cf.conf['DATA_PATH']
            self.fileName = cf.conf['FILE_NAME']
            self.Ind = cf.conf['DEBUG_IND']
            self.subdir = str(cf.conf['OUT_DIR'])
            self.base_url = cf.conf['BASE_URL']
            self.outputPath = cf.conf['OUTPUT_PATH']
            self.vectorDBPath = cf.conf['VECTORDB_PATH']
            self.openAIKey = cf.conf['OPEN_AI_KEY']
            self.temp = cf.conf['TEMP_VAL']
            self.modelName = cf.conf['MODEL_NAME']
            self.maxToken = cf.conf['MAX_TOKEN']
    
        def extractHash(self, text):
            try:
                # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
                pattern = r"Ref: \{'(\d+)'\}"
                match = re.search(pattern, text)
    
                if match:
                    return match.group(1)
                else:
                    return None
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return None
    
        def removeSentencesWithNaN(self, text):
            try:
                # Split text into sentences using regular expression
                sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
                # Filter out sentences containing 'nan'
                filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
                # Rejoin the sentences
                return ' '.join(filteredSentences)
            except Exception as e:
                x = str(e)
                print('Error: ', x)
    
                return ''
    
        def retrieveDocumentsReader(self, question, top_k=9):
            return retriever.retrieve(question, top_k=top_k)
    
        def generateAnswerWithGPT3(self, retrieved_docs, question):
            try:
                openai.api_key = self.openAIKey
                temp = self.temp
                modelName = self.modelName
                maxToken = self.maxToken
    
                documentsText = " ".join([doc.content for doc in retrieved_docs])
    
                filteredDocs = self.removeSentencesWithNaN(documentsText)
                hashValue = self.extractHash(filteredDocs)
    
                print('RAG Docs:: ')
                print(filteredDocs)
                #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"
    
                # Set up a chat-style prompt with your data
                messages = [
                    {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                    {"role": "user", "content": filteredDocs}
                ]
    
                # Chat style invoking the latest model
                response = openai.ChatCompletion.create(
                    model=modelName,
                    messages=messages,
                    temperature = temp,
                    max_tokens=maxToken
                )
                return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
            except Exception as e:
                x = str(e)
                print('failed to get from OpenAI: ', x)
                return 'Not Available!'
    
        def ragAnswerWithHaystackAndGPT3(self, question):
            retrievedDocs = self.retrieveDocumentsReader(question)
            return self.generateAnswerWithGPT3(retrievedDocs, question)
    
        def getData(self, strVal):
            try:
                print('*'*120)
                print('Index Your Data for Retrieval:')
                print('*'*120)
    
                print('Response from New Docs: ')
                print()
    
                hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)
    
                print('GPT3 Answer::')
                print(answer)
                print('Hash Value:')
                print(str(hashValue))
    
                print('*'*240)
                print('End Of Use RAG to Generate Answers:')
                print('*'*240)
    
                return hashValue, answer
            except Exception as e:
                x = str(e)
                print('Error: ', x)
                answer = x
                hashValue = 1
    
                return hashValue, answer
    

    Let us understand some of the important block –

    Function – ragAnswerWithHaystackAndGPT3():

    The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

    Function – generateAnswerWithGPT3():

    The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

    Function – retrieveDocumentsReader():

    The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

    • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
    // App.js
    import React, { useState } from 'react';
    import axios from 'axios';
    import './App.css';
    
    const App = () => {
      const [isLoggedIn, setIsLoggedIn] = useState(false);
      const [username, setUsername] = useState('');
      const [password, setPassword] = useState('');
      const [message, setMessage] = useState('');
      const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);
    
      const handleLogin = async (e) => {
        e.preventDefault();
        try {
          const response = await axios.post('http://localhost:5000/login', { username, password });
          if (response.status === 200) {
            setIsLoggedIn(true);
          }
        } catch (error) {
          console.error('Login error:', error);
        }
      };
    
      const sendMessage = async (username) => {
        if (message.trim() === '') return;
    
        // Create a new chat entry
        const newChatEntry = {
          sender: 'user',
          message: message.trim(),
        };
    
        // Clear the input field
        setMessage('');
    
        try {
          // Make API request to Python-based API
          const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
          const responseData = response.data;
    
          // Print the response to the console for debugging
          console.log('API Response:', responseData);
    
          // Parse the nested JSON from the 'message' attribute
          const jsonData = JSON.parse(responseData.message);
    
          // Check if the data contains 'departments'
          if (jsonData.departments) {
    
            // Extract the 'departments' attribute from the parsed data
            const departments = jsonData.departments;
    
            // Extract the department names and create a single string with line breaks
            const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');
    
            // Update the chat log with the bot's response
            setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
          }
          else if (jsonData.records)
          {
            // Data structure 2: Artwork information
            const records = jsonData.records;
    
            // Prepare chat entries
            const chatEntries = [];
    
            // Iterate through records and extract text, image, and wiki information
            records.forEach((record) => {
              const textInfo = Object.entries(record).map(([key, value]) => {
                if (key !== 'Image' && key !== 'Wiki') {
                  return `${key}: ${value}`;
                }
                return null;
              }).filter((info) => info !== null).join('\n');
    
              const imageLink = record.Image;
              //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
              //const wikiLinks = record.Wiki;
              const wikiLinks = record.Wiki.split(',').map(link => link.trim());
    
              console.log('Wiki:', wikiLinks);
    
              // Check if there is a valid image link
              const hasValidImage = imageLink && imageLink !== '[]';
    
              const imageElement = hasValidImage ? (
                <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
              ) : null;
    
              // Create JSX elements for rendering the wiki links (if available)
              const wikiElements = wikiLinks.map((link, index) => (
                <div key={index}>
                  <a href={link} target="_blank" rel="noopener noreferrer">
                    Wiki Link {index + 1}
                  </a>
                </div>
              ));
    
              if (textInfo) {
                chatEntries.push({ sender: 'bot', message: textInfo });
              }
    
              if (imageElement) {
                chatEntries.push({ sender: 'bot', message: imageElement });
              }
    
              if (wikiElements.length > 0) {
                chatEntries.push({ sender: 'bot', message: wikiElements });
              }
            });
    
            // Update the chat log with the bot's response
            setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
          }
    
        } catch (error) {
          console.error('Error sending message:', error);
        }
      };
    
      if (!isLoggedIn) {
        return (
          <div className="login-container">
            <h2>Welcome to the MuBot</h2>
            <form onSubmit={handleLogin} className="login-form">
              <input
                type="text"
                placeholder="Enter your name"
                value={username}
                onChange={(e) => setUsername(e.target.value)}
                required
              />
              <input
                type="password"
                placeholder="Enter your password"
                value={password}
                onChange={(e) => setPassword(e.target.value)}
                required
              />
              <button type="submit">Login</button>
            </form>
          </div>
        );
      }
    
      return (
        <div className="chat-container">
          <div className="chat-header">
            <h2>Hello, {username}</h2>
            <h3>Chat with MuBot</h3>
          </div>
          <div className="chat-log">
            {chatLog.map((chatEntry, index) => (
              <div
                key={index}
                className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
              >
                <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
                <p className="chat-message">{chatEntry.message}</p>
              </div>
            ))}
          </div>
          <div className="chat-input">
            <input
              type="text"
              placeholder="Type your message..."
              value={message}
              onChange={(e) => setMessage(e.target.value)}
              onKeyPress={(e) => {
                if (e.key === 'Enter') {
                  sendMessage();
                }
              }}
            />
            <button onClick={sendMessage}>Send</button>
          </div>
        </div>
      );
    };
    
    export default App;
    

    Please find some of the important logic –

    Function – handleLogin():

    The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

    Function – sendMessage():

    The sendMessage asynchronous function is designed to handle the user’s chat interaction:

    1. If the message is empty (after trimming spaces), the function exits without further action.
    2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
    3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
    4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
    5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
    6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
    7. Errors, if encountered, are logged to the console.

    This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


    Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


    So, finally, we’ve done it.

    I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

    You will get the complete codebase in the following GitHub link.

    I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

    Till then, Happy Avenging! 🙂