Building & deploying a RAG architecture rapidly using Langflow & Python

I’ve been looking for a solution that can help deploy any RAG solution involving Python faster. It would be more effective if an available UI helped deliver the solution faster. And, here comes the solution that does exactly what I needed – “LangFlow.”

Before delving into the details, I strongly recommend taking a look at the demo. It’s a great way to get a comprehensive understanding of LangFlow and its capabilities in deploying RAG architecture rapidly.

Demo

This describes the entire architecture; hence, I’ll share the architecture components I used to build the solution.

To know more about RAG-Architecture, please refer to the following link.

As we all know, we can parse the data from the source website URL (in this case, I’m referring to my photography website to extract the text of one of my blogs) and then embed it into the newly created Astra DB & new collection, where I will be storing the vector embeddings.

As you can see from the above diagram, the flow that I configured within 5 minutes and the full functionality of writing a complete solution (underlying Python application) within no time that extracts chunks, converts them into embeddings, and finally stores them inside the Astra DB.

Now, let us understand the next phase, where, based on the ask from a chatbot, I need to convert that question into Vector DB & then find the similarity search to bring the relevant vectors as shown below –

You need to configure this entire flow by dragging the necessary widgets from the left-side panel as marked in the Blue-Box shown below –

For this specific use case, we’ve created an instance of Astra DB & then created an empty vector collection. Also, we need to ensure that we generate the API-Key & and provide the right roles assigned with the token. After successfully creating the token, you need to copy the endpoint, token & collection details & paste them into the desired fields of the Astra-DB components inside the LangFlow. Think of it as a framework where one needs to provide all the necessary information to build & run the entire flow successfully.

Following are some of the important snapshots from the Astra-DB –

Step – 1

Step – 2

Once you run the vector DB population, this will insert extracted text & then convert it into vectors, which will show in the following screenshot –

You can see the sample vectors along with the text chunks inside the Astra DB data explorer as shown below –

Some of the critical components are highlighted in the Blue-box which is important for us to monitor the vector embeddings.

Now, here is how you can modify the current Python code of any available widgets or build your own widget by using the custom widget.

The first step is to click the code button highlighted in the Red-box as shown below –

The next step is when you click that button, which will open the detailed Python code representing the entire widget build & its functionality. This button is the place where you can add, modify, or keep it as it is depending upon your need, which will shown below –

Once one builds the entire solution, you must click the final compile button (shown in the red box), which will eventually compile all the individual widgets. However, you can build the compile button for the individual widgets as soon as you make the solution. So you can pinpoint any potential problems at that very step.

Let us understand one sample code of a widget. In this case, we will take vector embedding insertion into the Astra DB. Let us see the code –

from typing import List, Optional, Union
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode

from langflow.custom import CustomComponent
from langflow.field_typing import Embeddings, VectorStore
from langflow.schema import Record
from langchain_core.retrievers import BaseRetriever


class AstraDBVectorStoreComponent(CustomComponent):
    display_name = "Astra DB"
    description = "Builds or loads an Astra DB Vector Store."
    icon = "AstraDB"
    field_order = ["token", "api_endpoint", "collection_name", "inputs", "embedding"]

    def build_config(self):
        return {
            "inputs": {
                "display_name": "Inputs",
                "info": "Optional list of records to be processed and stored in the vector store.",
            },
            "embedding": {"display_name": "Embedding", "info": "Embedding to use"},
            "collection_name": {
                "display_name": "Collection Name",
                "info": "The name of the collection within Astra DB where the vectors will be stored.",
            },
            "token": {
                "display_name": "Token",
                "info": "Authentication token for accessing Astra DB.",
                "password": True,
            },
            "api_endpoint": {
                "display_name": "API Endpoint",
                "info": "API endpoint URL for the Astra DB service.",
            },
            "namespace": {
                "display_name": "Namespace",
                "info": "Optional namespace within Astra DB to use for the collection.",
                "advanced": True,
            },
            "metric": {
                "display_name": "Metric",
                "info": "Optional distance metric for vector comparisons in the vector store.",
                "advanced": True,
            },
            "batch_size": {
                "display_name": "Batch Size",
                "info": "Optional number of records to process in a single batch.",
                "advanced": True,
            },
            "bulk_insert_batch_concurrency": {
                "display_name": "Bulk Insert Batch Concurrency",
                "info": "Optional concurrency level for bulk insert operations.",
                "advanced": True,
            },
            "bulk_insert_overwrite_concurrency": {
                "display_name": "Bulk Insert Overwrite Concurrency",
                "info": "Optional concurrency level for bulk insert operations that overwrite existing records.",
                "advanced": True,
            },
            "bulk_delete_concurrency": {
                "display_name": "Bulk Delete Concurrency",
                "info": "Optional concurrency level for bulk delete operations.",
                "advanced": True,
            },
            "setup_mode": {
                "display_name": "Setup Mode",
                "info": "Configuration mode for setting up the vector store, with options likeSync,Async, orOff”.",
                "options": ["Sync", "Async", "Off"],
                "advanced": True,
            },
            "pre_delete_collection": {
                "display_name": "Pre Delete Collection",
                "info": "Boolean flag to determine whether to delete the collection before creating a new one.",
                "advanced": True,
            },
            "metadata_indexing_include": {
                "display_name": "Metadata Indexing Include",
                "info": "Optional list of metadata fields to include in the indexing.",
                "advanced": True,
            },
            "metadata_indexing_exclude": {
                "display_name": "Metadata Indexing Exclude",
                "info": "Optional list of metadata fields to exclude from the indexing.",
                "advanced": True,
            },
            "collection_indexing_policy": {
                "display_name": "Collection Indexing Policy",
                "info": "Optional dictionary defining the indexing policy for the collection.",
                "advanced": True,
            },
        }

    def build(
        self,
        embedding: Embeddings,
        token: str,
        api_endpoint: str,
        collection_name: str,
        inputs: Optional[List[Record]] = None,
        namespace: Optional[str] = None,
        metric: Optional[str] = None,
        batch_size: Optional[int] = None,
        bulk_insert_batch_concurrency: Optional[int] = None,
        bulk_insert_overwrite_concurrency: Optional[int] = None,
        bulk_delete_concurrency: Optional[int] = None,
        setup_mode: str = "Sync",
        pre_delete_collection: bool = False,
        metadata_indexing_include: Optional[List[str]] = None,
        metadata_indexing_exclude: Optional[List[str]] = None,
        collection_indexing_policy: Optional[dict] = None,
    ) -> Union[VectorStore, BaseRetriever]:
        try:
            setup_mode_value = SetupMode[setup_mode.upper()]
        except KeyError:
            raise ValueError(f"Invalid setup mode: {setup_mode}")
        if inputs:
            documents = [_input.to_lc_document() for _input in inputs]

            vector_store = AstraDBVectorStore.from_documents(
                documents=documents,
                embedding=embedding,
                collection_name=collection_name,
                token=token,
                api_endpoint=api_endpoint,
                namespace=namespace,
                metric=metric,
                batch_size=batch_size,
                bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                bulk_delete_concurrency=bulk_delete_concurrency,
                setup_mode=setup_mode_value,
                pre_delete_collection=pre_delete_collection,
                metadata_indexing_include=metadata_indexing_include,
                metadata_indexing_exclude=metadata_indexing_exclude,
                collection_indexing_policy=collection_indexing_policy,
            )
        else:
            vector_store = AstraDBVectorStore(
                embedding=embedding,
                collection_name=collection_name,
                token=token,
                api_endpoint=api_endpoint,
                namespace=namespace,
                metric=metric,
                batch_size=batch_size,
                bulk_insert_batch_concurrency=bulk_insert_batch_concurrency,
                bulk_insert_overwrite_concurrency=bulk_insert_overwrite_concurrency,
                bulk_delete_concurrency=bulk_delete_concurrency,
                setup_mode=setup_mode_value,
                pre_delete_collection=pre_delete_collection,
                metadata_indexing_include=metadata_indexing_include,
                metadata_indexing_exclude=metadata_indexing_exclude,
                collection_indexing_policy=collection_indexing_policy,
            )

        return vector_store

Method: build_config:

  • This method defines the configuration options for the component.
  • Each configuration option includes a display_name and info, which provides details about the option.
  • Some options are marked as advanced, indicating they are optional and more complex.

Method: build:

  • This method is used to create an instance of the Astra DB Vector Store.
  • It takes several parameters, including embedding, token, api_endpoint, collection_name, and various optional parameters.
  • It converts the setup_mode string to an enum value.
  • If inputs are provided, they are converted to a format suitable for storing in the vector store.
  • Depending on whether inputs are provided, a new vector store from documents can be created, or an empty vector store can be initialized with the given configurations.
  • Finally, it returns the created vector store instance.

And, here is the the screenshot of your run –

And, this is the last steps to run the Integrated Chatbot as shown below –

As one can see the left side highlighted shows the reference text & chunks & the right side actual response.


So, we’ve done it. And, you know the fun fact. I did this entire workflow within 35 minutes alone. 😛

I’ll bring some more exciting topics in the coming days from the Python verse.

To learn more about LangFlow, please click here.

To learn about Astra DB, you need to click the following link.

To learn about my blog & photography, you can click the following url.

Till then, Happy Avenging!  🙂

Building a real-time streamlit app by consuming events from Ably channels

I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.

One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.

However, I would like to share the run before we dig deep into this.


Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.

Let’s explore the broad-level architecture/flow –

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.

Let us understand the sample packages that are required for this task.

pip install ably==2.0.3
pip install numpy==1.26.3
pip install pandas==2.2.0
pip install plotly==5.19.0
pip install requests==2.31.0
pip install streamlit==1.30.0
pip install streamlit-autorefresh==1.0.1
pip install streamlit-echarts==0.4.0

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.

1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)

Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.

def createHumidityGauge(humidity_value):
    fig = go.Figure(go.Indicator(
        mode = "gauge+number",
        value = humidity_value,
        domain = {'x': [0, 1], 'y': [0, 1]},
        title = {'text': "Humidity", 'font': {'size': 24}},
        gauge = {
            'axis': {'range': [None, 100], 'tickwidth': 1, 'tickcolor': "darkblue"},
            'bar': {'color': "darkblue"},
            'bgcolor': "white",
            'borderwidth': 2,
            'bordercolor': "gray",
            'steps': [
                {'range': [0, 50], 'color': 'cyan'},
                {'range': [50, 100], 'color': 'royalblue'}],
            'threshold': {
                'line': {'color': "red", 'width': 4},
                'thickness': 0.75,
                'value': humidity_value}
        }
    ))

    fig.update_layout(height=220, paper_bgcolor = "white", font = {'color': "darkblue", 'family': "Arial"}, margin=dict(t=0, l=5, r=5, b=0))

    return fig

The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.

This code defines a function createHumidityGauge that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:

  1. Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
  2. Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
  3. Setting Gauge Properties:
    • The value is set to the humidity_value parameter, so the gauge shows this humidity level.
    • The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
    • The title is set to “Humidity” with a font size of 24, labeling the gauge.
    • The gauge section defines the appearance and behavior of the gauge, including:
      • An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
      • The color and style of the gauge’s bar and background.
      • Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
      • A threshold line that appears at the value of the humidity, marked in red to stand out.
  4. Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
  5. Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.

Other similar functions will repeat the same steps.

def createTemperatureLineChart(data):
    # Assuming 'data' is a DataFrame with a 'Timestamp' index and a 'Temperature' column
    fig = px.line(data, x=data.index, y='Temperature', title='Temperature Vs Time')
    fig.update_layout(height=270)  # Specify the desired height here
    return fig

The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.

This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:

  1. Function Definition: It starts with defining a function named createTemperatureLineChart that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
  2. Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
    • The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
    • The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
    • The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
  3. Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
  4. Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.

Similar functions will repeat for other KPIs.

    st.sidebar.header("KPIs")
    selected_kpis = st.sidebar.multiselect(
        "Select KPIs", options=["Temperature", "Humidity", "Pressure"], default=["Temperature"]
    )

The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).

# Split the layout into columns for KPIs and graphs
    gauge_col, kpi_col, graph_col = st.columns(3)

    # Auto-refresh setup
    st_autorefresh(interval=7000, key='data_refresh')

    # Fetching real-time data
    data = getData(var1, DInd)

    st.markdown(
        """
        <style>
        .stEcharts { margin-bottom: -50px; }  /* Class might differ, inspect the HTML to find the correct class name */
        </style>
        """,
        unsafe_allow_html=True
    )

    # Display gauges at the top of the page
    gauges = st.container()

    with gauges:
        col1, col2, col3 = st.columns(3)
        with col1:
            humidity_value = round(data['Humidity'].iloc[-1], 2)
            humidity_gauge_fig = createHumidityGauge(humidity_value)
            st.plotly_chart(humidity_gauge_fig, use_container_width=True)

        with col2:
            temp_value = round(data['Temperature'].iloc[-1], 2)
            temp_gauge_fig = createTempGauge(temp_value)
            st.plotly_chart(temp_gauge_fig, use_container_width=True)

        with col3:
            pressure_value = round(data['Pressure'].iloc[-1], 2)
            pressure_gauge_fig = createPressureGauge(pressure_value)
            st.plotly_chart(pressure_gauge_fig, use_container_width=True)


    # Next row for actual readings and charts side-by-side
    readings_charts = st.container()


    # Display KPIs and their trends
    with readings_charts:
        readings_col, graph_col = st.columns([1, 2])

        with readings_col:
            st.subheader("Latest Readings")
            if "Temperature" in selected_kpis:
                st.metric("Temperature", f"{temp_value:.2f}%")

            if "Humidity" in selected_kpis:
                st.metric("Humidity", f"{humidity_value:.2f}%")

            if "Pressure" in selected_kpis:
                st.metric("Pressure", f"{pressure_value:.2f}%")


        # Graph placeholders for each KPI
        with graph_col:
            if "Temperature" in selected_kpis:
                temperature_fig = createTemperatureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(temperature_fig, use_container_width=True)

            if "Humidity" in selected_kpis:
                humidity_fig = createHumidityLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(humidity_fig, use_container_width=True)

            if "Pressure" in selected_kpis:
                pressure_fig = createPressureLineChart(data.set_index("Timestamp"))

                # Display the Plotly chart in Streamlit with specified dimensions
                st.plotly_chart(pressure_fig, use_container_width=True)
  1. The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
  2. It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
  3. Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
  4. A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
  5. A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
  6. Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
  7. Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
  8. The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
  9. In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
  10. This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.

Let us understand some of the important screenshots of this application –


So, we’ve done it.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂