Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.
In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.
FLOW OF EVENTS:
Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.
Let us understand the flow of events here –
- The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
- The application will clean the nested data & extract the relevant attributes after flattening the JSON.
- It will create the unstructured text-based context, which is later fed to the Vector DB framework.
IMPORTANT PACKAGES:
pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1
We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.
CODE:
We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.
Python:
We’ll discuss the scripts in the diagram as part of the flow mentioned above.
- clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
def genData(self):
try:
base_url = self.base_url
header_token = self.header_token
basePath = self.basePath
outputPath = self.outputPath
mergedFile = self.mergedFile
subdir = self.subdir
Ind = self.Ind
var_1 = datetime.now().strftime("%H.%M.%S")
devVal = list()
objVal = list()
# Main Details
headers = {'Cookie':header_token}
payload={}
url = base_url + '/departments'
date_ranges = self.generateFirstDayOfLastTenYears()
# Getting all the departments
try:
print('Department URL:')
print(str(url))
response = requests.request("GET", url, headers=headers, data=payload)
parsed_data = json.loads(response.text)
print('Department JSON:')
print(str(parsed_data))
# Extract the "departmentId" values into a Python list
for dept_det in parsed_data['departments']:
for info in dept_det:
if info == 'departmentId':
devVal.append(dept_det[info])
except Exception as e:
x = str(e)
print('Error: ', x)
devVal = list()
# List to hold thread objects
threads = []
# Calling the Data using threads
for dep in devVal:
t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
threads.append(t)
t.start()
# Wait for all threads to complete
for t in threads:
t.join()
res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)
if res == 0:
print('Successful!')
else:
print('Failure!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
The above code translates into the following steps –
- The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
- Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
- Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
def generateFirstDayOfLastTenYears(self):
yearRange = self.yearRange
date_format = "%Y-%m-%d"
current_year = datetime.now().year
date_ranges = []
for year in range(current_year - yearRange, current_year + 1):
first_day_of_year_full = datetime(year, 1, 1)
first_day_of_year = first_day_of_year_full.strftime(date_format)
date_ranges.append(first_day_of_year)
return date_ranges
The first method will generate the first day of each year for the last ten years, including the current year.
def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
try:
cnt = 0
cnt_x = 1
var_1 = datetime.now().strftime("%H.%M.%S")
for x_start_date in date_ranges:
try:
urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)
print('Nested URL:')
print(str(urlM))
response_obj = requests.request("GET", urlM, headers=headers, data=payload)
objectDets = json.loads(response_obj.text)
for obj_det in objectDets['objectIDs']:
objVal.append(obj_det)
for objId in objVal:
urlS = base_url + '/objects/' + str(objId)
print('Final URL:')
print(str(urlS))
response_det = requests.request("GET", urlS, headers=headers, data=payload)
objDetJSON = response_det.text
retDB = self.createData(objDetJSON)
retDB['departmentId'] = str(dep)
if cnt == 0:
df_M = retDB
else:
d_frames = [df_M, retDB]
df_M = pd.concat(d_frames)
if cnt == 1000:
cnt = 0
clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
cnt_x += 1
df_M = pd.DataFrame()
cnt += 1
except Exception as e:
x = str(e)
print('Error X:', x)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
The above method will invoke the individual API call to fetch the relevant artifact information.
def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
try:
csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
data_frames = []
for file in csv_files:
encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
for encoding in encodings_to_try:
try:
FullFileName = directory_path + file
print('File Name: ', FullFileName)
df = pd.read_csv(FullFileName, encoding=encoding)
data_frames.append(df)
break # Stop trying other encodings if the reading is successful
except UnicodeDecodeError:
continue
if not data_frames:
raise Exception("Unable to read CSV files. Check encoding or file format.")
merged_df = pd.concat(data_frames, ignore_index=True)
merged_full_name = os.path.join(output_path, output_file)
merged_df.to_csv(merged_full_name, index=False)
for file in csv_files:
os.remove(os.path.join(directory_path, file))
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).
For the complete code, please visit the GitHub.
- 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### shortcut application created inside MAC ####
#### enviornment including MacBook, IPad or IPhone. ####
#### ####
#########################################################
import datetime
from clsConfigClient import clsConfigClient as cf
import clsExtractJSON as cej
########################################################
################ Global Area ######################
########################################################
cJSON = cej.clsExtractJSON()
basePath = cf.conf['DATA_PATH']
outputPath = cf.conf['OUTPUT_PATH']
mergedFile = cf.conf['MERGED_FILE']
########################################################
################ End Of Global Area #################
########################################################
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
def main():
try:
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
r1 = cJSON.genData()
if r1 == 0:
print()
print('Successfully Scrapped!')
else:
print()
print('Failed to Scrappe!')
print('*'*120)
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == '__main__':
main()
The above script calls the main class after instantiating the class.
- clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
def createRec(self):
try:
basePath = self.basePath
fileName = self.fileName
Ind = self.Ind
subdir = self.subdir
base_url = self.base_url
outputPath = self.outputPath
mergedFile = self.mergedFile
cleanedFile = self.cleanedFile
FullFileName = outputPath + mergedFile
df = pd.read_csv(FullFileName)
df2 = df[listCol]
dfFin = df2.drop_duplicates().reset_index(drop=True)
dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])
# Dropping the old Dtype Columns
dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
dfFin.drop(['objectURL'], axis=1, inplace=True)
dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
dfFin.drop(['AAT_URL'], axis=1, inplace=True)
dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
dfFin.drop(['URL'], axis=1, inplace=True)
# Save the filtered DataFrame to a new CSV file
#clog.logr(cleanedFile, Ind, dfFin, subdir)
res = self.addHash(dfFin)
if res == 0:
print('Added Hash!')
else:
print('Failed to add hash!')
# Generate the text for each row in the dataframe
for _, row in dfFin.iterrows():
x = self.genPrompt(row)
self.addDocument(x, cleanedFile)
return documents
except Exception as e:
x = str(e)
print('Record Error: ', x)
return documents
The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.
Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.
Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.
For more details, please visit the GitHub link.
- 1_1_testCreateRec.py (This is the main class that will call the above class.)
#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Jun-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### shortcut application created inside MAC ####
#### enviornment including MacBook, IPad or IPhone. ####
#### ####
#########################################################
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsCreateList as ccl
from datetime import datetime, timedelta
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()
var = datetime.now().strftime(".%H.%M.%S")
documents = []
###############################################
### End of Global Section ###
###############################################
def main():
try:
var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var))
print('*'*120)
print('*'*240)
print('Creating Index store:: ')
print('*'*240)
documents = cl.createRec()
print('Inserted Sample Records: ')
print(str(documents))
print('\n')
r1 = len(documents)
if r1 > 0:
print()
print('Successfully Indexed sample records!')
else:
print()
print('Failed to sample Indexed recrods!')
print('*'*120)
var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == '__main__':
main()
The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.
This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.
– Satyaki
- clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
#########################################################
#### Written By: SATYAKI DE ####
#### Written On: 27-Jun-2023 ####
#### Modified On 28-Sep-2023 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### haystack frameowrk to contextulioze the docs ####
#### inside the vector DB. ####
#### ####
#########################################################
from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai
import pandas as pd
import os
import clsCreateList as ccl
from clsConfigClient import clsConfigClient as cf
import clsL as log
from datetime import datetime, timedelta
# Disbling Warning
def warn(*args, **kwargs):
pass
import warnings
warnings.warn = warn
###############################################
### Global Section ###
###############################################
Ind = cf.conf['DEBUG_IND']
openAIKey = cf.conf['OPEN_AI_KEY']
os.environ["TOKENIZERS_PARALLELISM"] = "false"
#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()
var = datetime.now().strftime(".%H.%M.%S")
# Encode your data to create embeddings
documents = []
var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var_1))
print('*'*120)
print('*'*240)
print('Creating Index store:: ')
print('*'*240)
documents = cl.createRec()
print('Inserted Sample Records: ')
print(documents[:5])
print('\n')
print('Type:')
print(type(documents))
r1 = len(documents)
if r1 > 0:
print()
print('Successfully Indexed records!')
else:
print()
print('Failed to Indexed recrods!')
print('*'*120)
var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var_2))
# Passing OpenAI API Key
openai.api_key = openAIKey
###############################################
### End of Global Section ###
###############################################
class clsFeedVectorDB:
def __init__(self):
self.basePath = cf.conf['DATA_PATH']
self.modelFileName = cf.conf['CACHE_FILE']
self.vectorDBPath = cf.conf['VECTORDB_PATH']
self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
self.queryModel = cf.conf['QUERY_MODEL']
self.passageModel = cf.conf['PASSAGE_MODEL']
def retrieveDocuments(self, question, retriever, top_k=3):
return retriever.retrieve(question, top_k=top_k)
def generateAnswerWithGPT3(self, retrievedDocs, question):
documents_text = " ".join([doc.content for doc in retrievedDocs])
prompt = f"Given the following documents: {documents_text}, answer the question: {question}"
response = openai.Completion.create(
model="text-davinci-003",
prompt=prompt,
max_tokens=150
)
return response.choices[0].text.strip()
def ragAnswerWithHaystackAndGPT3(self, question, retriever):
retrievedDocs = self.retrieveDocuments(question, retriever)
return self.generateAnswerWithGPT3(retrievedDocs, question)
def genData(self, strVal):
try:
basePath = self.basePath
modelFileName = self.modelFileName
vectorDBPath = self.vectorDBPath
vectorDBFileName = self.vectorDBFileName
queryModel = self.queryModel
passageModel = self.passageModel
print('*'*120)
print('Index Your Data for Retrieval:')
print('*'*120)
FullFileName = basePath + modelFileName
FullVectorDBname = vectorDBPath + vectorDBFileName
sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
print('Vector DB Path: ', str(sqlite_path))
indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"
print('File: ', str(indexFile))
print('Config: ', str(indexConfig))
# Initialize DocumentStore
document_store = FAISSDocumentStore(sql_url=sqlite_path)
libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'
document_store.write_documents(documents)
# Initialize Retriever
retriever = DensePassageRetriever(document_store=document_store,
query_embedding_model=queryModel,
passage_embedding_model=passageModel,
use_gpu=False)
document_store.update_embeddings(retriever=retriever)
document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")
print('*'*120)
print('Testing with RAG & OpenAI...')
print('*'*120)
answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)
print('*'*120)
print('Testing Answer:: ')
print(answer)
print('*'*120)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
In the above script, the following essential steps took place –
- First, the application calls the clsCreateList class to store all the documents inside a dictionary.
- Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
- Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.
Here is a short clip of how the RAG models contextualize with the source data.
So, finally, we’ve done it.
I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.
You will get the complete codebase in the following GitHub link.
I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.