Real-time video summary assistance App – Part 1

Today, we’ll discuss another topic in our two-part series. We will understand the importance of the MCP protocol for communicating between agents.

This will be an in-depth highly technical as well as depicting using easy-to-understand visuals.

But, before that, let us understand the demo first.

Isn’t it exciting?


Let us first understand in easy language about the MCP protocol.

MCP (Multi-Agent Communication Protocol) is a custom message exchange system that facilitates structured and scalable communication among multiple AI agents operating within an application. These agents collaborate asynchronously or in real-time to complete complex tasks by sharing results, context, and commands through a common messaging layer.

How MCP Protocol Helps:

FeatureBenefit
Agent-Oriented ArchitectureEach agent handles a focused task, improving modularity and scalability.
Event-Driven Message PassingAgents communicate based on triggers, not polling—leading to faster and efficient responses.
Structured Communication FormatAll messages follow a standard format (e.g., JSON) with metadata for sender, recipient, type, and payload.
State PreservationAgents maintain context across messages using memory (e.g., ConversationBufferMemory) to ensure coherence.

How It Works (Step-by-Step):

  • 📥 User uploads or streams a video.
  • 🧑‍💻 MCP Protocol triggers the Transcription Agent to start converting audio into text.
  • 🌐 Translation Agent receives this text (if a different language is needed).
  • 🧾 Summarization Agent receives the translated or original transcript and generates a concise summary.
  • 📚 Research Agent checks for references or terminology used in the video.
  • 📄 Documentation Agent compiles the output into a structured report.
  • 🔁 All communication between agents flows through MCP, ensuring consistent message delivery and coordination.

Now, let us understand the solution that we intend to implement for our solutions:

This app provides live summarization and contextual insights from videos such as webinars, interviews, or YouTube recordings using multiple cooperating AI agents. These agents may include:

  • Transcription Agent: Converts spoken words to text.
  • Translation Agent: Translates text to different languages (if needed).
  • Summarization Agent: Generates concise summaries.
  • Research Agent: Finds background or supplementary data related to the discussion.
  • Documentation Agent: Converts outputs into structured reports or learning materials.

We need to understand one more thing before deep diving into the code. Part of your conversation may be mixed, like part Hindi & part English. So, in that case, it will break the sentences into chunks & then convert all of them into the same language. Hence, the following rules are applied while translating the sentences –


Now, we will go through the basic frame of the system & try to understand how it fits all the principles that we discussed above for this particular solution mapped against the specific technology –

  1. Documentation Agent built with the LangChain framework
  2. Research Agent built with the AutoGen framework
  3. MCP Broker for seamless communication between agents

Let us understand from the given picture the flow of the process that our app is trying to implement –


Great! So, now, we’ll focus on some of the key Python scripts & go through their key features.

But, before that, we share the group of scripts that belong to specific tasks.

  • clsMCPMessage.py
  • clsMCPBroker.py
  • clsYouTubeVideoProcessor.py
  • clsLanguageDetector.py
  • clsTranslationAgent.py
  • clsTranslationService.py
  • clsDocumentationAgent.py
  • clsResearchAgent.py

Now, we’ll review some of the script in this post, along with the next post, as a continuation from this post.

class clsMCPMessage(BaseModel):
    """Message format for MCP protocol"""
    id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    timestamp: float = Field(default_factory=time.time)
    sender: str
    receiver: str
    message_type: str  # "request", "response", "notification"
    content: Dict[str, Any]
    reply_to: Optional[str] = None
    conversation_id: str
    metadata: Dict[str, Any] = {}
    
class clsMCPBroker:
    """Message broker for MCP protocol communication between agents"""
    
    def __init__(self):
        self.message_queues: Dict[str, queue.Queue] = {}
        self.subscribers: Dict[str, List[str]] = {}
        self.conversation_history: Dict[str, List[clsMCPMessage]] = {}
    
    def register_agent(self, agent_id: str) -> None:
        """Register an agent with the broker"""
        if agent_id not in self.message_queues:
            self.message_queues[agent_id] = queue.Queue()
            self.subscribers[agent_id] = []
    
    def subscribe(self, subscriber_id: str, publisher_id: str) -> None:
        """Subscribe an agent to messages from another agent"""
        if publisher_id in self.subscribers:
            if subscriber_id not in self.subscribers[publisher_id]:
                self.subscribers[publisher_id].append(subscriber_id)
    
    def publish(self, message: clsMCPMessage) -> None:
        """Publish a message to its intended receiver"""
        # Store in conversation history
        if message.conversation_id not in self.conversation_history:
            self.conversation_history[message.conversation_id] = []
        self.conversation_history[message.conversation_id].append(message)
        
        # Deliver to direct receiver
        if message.receiver in self.message_queues:
            self.message_queues[message.receiver].put(message)
        
        # Deliver to subscribers of the sender
        for subscriber in self.subscribers.get(message.sender, []):
            if subscriber != message.receiver:  # Avoid duplicates
                self.message_queues[subscriber].put(message)
    
    def get_message(self, agent_id: str, timeout: Optional[float] = None) -> Optional[clsMCPMessage]:
        """Get a message for the specified agent"""
        try:
            return self.message_queues[agent_id].get(timeout=timeout)
        except (queue.Empty, KeyError):
            return None
    
    def get_conversation_history(self, conversation_id: str) -> List[clsMCPMessage]:
        """Get the history of a conversation"""
        return self.conversation_history.get(conversation_id, [])

Imagine a system where different virtual agents (like robots or apps) need to talk to each other. To do that, they send messages back and forth—kind of like emails or text messages. This code is responsible for:

  • Making sure those messages are properly written (like filling out all parts of a form).
  • Making sure messages are delivered to the right people.
  • Keeping a record of conversations so you can go back and review what was said.

This part (clsMCPMessage) is like a template or a form that every message needs to follow. Each message has:

  • ID: A unique number so every message is different (like a serial number).
  • Time Sent: When the message was created.
  • Sender & Receiver: Who sent the message and who is supposed to receive it.
  • Type of Message: Is it a request, a response, or just a notification?
  • Content: The actual information or question the message is about.
  • Reply To: If this message is answering another one, this tells which one.
  • Conversation ID: So we know which group of messages belongs to the same conversation.
  • Extra Info (Metadata): Any other small details that might help explain the message.

This (clsMCPBroker) is the system (or “post office”) that makes sure messages get to where they’re supposed to go. Here’s what it does:

1. Registering an Agent

  • Think of this like signing up a new user in the system.
  • Each agent gets their own personal mailbox (called a “message queue”) so others can send them messages.

2. Subscribing to Another Agent

  • If Agent A wants to receive copies of messages from Agent B, they can “subscribe” to B.
  • This is like signing up for B’s newsletter—whenever B sends something, A gets a copy.

3. Sending a Message

  • When someone sends a message:
    • It is saved into a conversation history (like keeping emails in your inbox).
    • It is delivered to the main person it was meant for.
    • And, if anyone subscribed to the sender, they get a copy too—unless they’re already the main receiver (to avoid sending duplicates).

4. Receiving Messages

  • Each agent can check their personal mailbox to see if they got any new messages.
  • If there are no messages, they’ll either wait for some time or move on.

5. Viewing Past Conversations

  • You can look up all messages that were part of a specific conversation.
  • This is helpful for remembering what was said earlier.

In systems where many different smart tools or services need to work together and communicate, this kind of communication system makes sure everything is:

  • Organized
  • Delivered correctly
  • Easy to trace back when needed

So, in this post, we’ll finish it here. We’ll cover the rest of the post in the next post.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging!  🙂

Building the optimized Indic Language bot by using the Python-based Sarvam AI LLMs – Part 2

As we discover in our previous post about the Sarvam AI basic capabilities & a glimpse of code review. Today, we’ll finish the rest of the part & some of the matrices comparing against other popular LLMs.

Before that, you can refer to the previous post for a recap, which is available here.

Also, we’re providing the demo here –


Now, let us jump into the rest of the code –

clsSarvamAI.py (This script will capture the audio input in Indic languages & then provide an LLM response in the form of audio in Indic languages. In this post, we’ll discuss part of the code. In the next part, we’ll be discussing the next important methods. Note that we’re only going to discuss a few important functions here.)

def createWavFile(self, audio, output_filename="output.wav", target_sample_rate=16000):
      try:
          # Get the raw audio data as bytes
          audio_data = audio.get_raw_data()

          # Get the original sample rate
          original_sample_rate = audio.sample_rate

          # Open the output file in write mode
          with wave.open(output_filename, 'wb') as wf:
              # Set parameters: nchannels, sampwidth, framerate, nframes, comptype, compname
              wf.setnchannels(1)  # Assuming mono audio
              wf.setsampwidth(2)  # 16-bit audio (int16)
              wf.setframerate(original_sample_rate)

              # Write audio data in chunks
              chunk_size = 1024 * 10  # Chunk size (adjust based on memory constraints)
              for i in range(0, len(audio_data), chunk_size):
                  wf.writeframes(audio_data[i:i+chunk_size])

          # Log the current timestamp
          var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
          print('Audio Time: ', str(var))

          return 0

      except Exception as e:
          print('Error: <Wav File Creation>: ', str(e))
          return 1

Purpose:

This method saves recorded audio data into a WAV file format.

What it Does:

  • Takes raw audio data and converts it into bytes.
  • Gets the original sample rate of the audio.
  • Opens a new WAV file in write mode.
  • Sets the parameters for the audio file (like the number of channels, sample width, and frame rate).
  • Writes the audio data into the file in small chunks to manage memory usage.
  • Logs the current time to keep track of when the audio was saved.
  • Returns 0 on success or 1 if there was an error.

The “createWavFile” method takes the recorded audio and saves it as a WAV file on your computer. It converts the audio into bytes and writes them into small file parts. If something goes wrong, it prints an error message.


def chunkBengaliResponse(self, text, max_length=500):
      try:
          chunks = []
          current_chunk = ""

          # Use regex to split on sentence-ending punctuation
          sentences = re.split(r'(।|\?|!)', text)

          for i in range(0, len(sentences), 2):
              sentence = sentences[i] + (sentences[i+1] if i+1 < len(sentences) else '')

              if len(current_chunk) + len(sentence) <= max_length:
                  current_chunk += sentence
              else:
                  if current_chunk:
                      chunks.append(current_chunk.strip())
                  current_chunk = sentence

          if current_chunk:
              chunks.append(current_chunk.strip())

          return chunks
      except Exception as e:
          x = str(e)
          print('Error: <<Chunking Bengali Response>>: ', x)

          return ''

Purpose:

This method breaks down a large piece of text (in Bengali) into smaller, manageable chunks.

What it Does:

  • Initializes an empty list to store the chunks of text.
  • It uses a regular expression to split the text based on punctuation marks like full stops (।), question marks (?), and exclamation points (!).
  • Iterates through the split sentences to form chunks that do not exceed a specified maximum length (max_length).
  • Adds each chunk to the list until the entire text is processed.
  • Returns the list of chunks or an empty string if an error occurs.

The chunkBengaliResponse method takes a long Bengali text and splits it into smaller, easier-to-handle parts. It uses punctuation marks to determine where to split. If there’s a problem while splitting, it prints an error message.


def playWav(self, audio_data):
      try:
          # Create a wav file object from the audio data
          WavFile = wave.open(io.BytesIO(audio_data), 'rb')

          # Extract audio parameters
          channels = WavFile.getnchannels()
          sample_width = WavFile.getsampwidth()
          framerate = WavFile.getframerate()
          n_frames = WavFile.getnframes()

          # Read the audio data
          audio = WavFile.readframes(n_frames)
          WavFile.close()

          # Convert audio data to numpy array
          dtype_map = {1: np.int8, 2: np.int16, 3: np.int32, 4: np.int32}
          audio_np = np.frombuffer(audio, dtype=dtype_map[sample_width])

          # Reshape audio if stereo
          if channels == 2:
              audio_np = audio_np.reshape(-1, 2)

          # Play the audio
          sd.play(audio_np, framerate)
          sd.wait()

          return 0
      except Exception as e:
          x = str(e)
          print('Error: <<Playing the Wav>>: ', x)

          return 1

Purpose:

This method plays audio data stored in a WAV file format.

What it Does:

  • Reads the audio data from a WAV file object.
  • Extracts parameters like the number of channels, sample width, and frame rate.
  • Converts the audio data into a format that the sound device can process.
  • If the audio is stereo (two channels), it reshapes the data for playback.
  • Plays the audio through the speakers.
  • Returns 0 on success or 1 if there was an error.

The playWav method takes audio data from a WAV file and plays it through your computer’s speakers. It reads the data and converts it into a format your speakers can understand. If there’s an issue playing the audio, it prints an error message.


  def audioPlayerWorker(self, queue):
      try:
          while True:
              var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
              print('Response Audio Time: ', str(var))
              audio_bytes = queue.get()
              if audio_bytes is None:
                  break
              self.playWav(audio_bytes)
              queue.task_done()

          return 0
      except Exception as e:
          x = str(e)
          print('Error: <<Audio Player Worker>>: ', x)

          return 1

Purpose:

This method continuously plays audio from a queue until there is no more audio to play.

What it Does:

  • It enters an infinite loop to keep checking for audio data in the queue.
  • Retrieves audio data from the queue and plays it using the “playWav”-method.
  • Logs the current time each time an audio response is played.
  • It breaks the loop if it encounters a None value, indicating no more audio to play.
  • Returns 0 on success or 1 if there was an error.

The audioPlayerWorker method keeps checking a queue for new audio to play. It plays each piece of audio as it comes in and stops when there’s no more audio. If there’s an error during playback, it prints an error message.


  async def processChunk(self, chText, url_3, headers):
      try:
          sarvamAPIKey = self.sarvamAPIKey
          model_1 = self.model_1
          langCode_1 = self.langCode_1
          speakerName = self.speakerName

          print()
          print('Chunk Response: ')
          vText = chText.replace('*','').replace(':',' , ')
          print(vText)

          payload_3 = {
              "inputs": [vText],
              "target_language_code": langCode_1,
              "speaker": speakerName,
              "pitch": 0.15,
              "pace": 0.95,
              "loudness": 2.1,
              "speech_sample_rate": 16000,
              "enable_preprocessing": True,
              "model": model_1
          }
          response_3 = requests.request("POST", url_3, json=payload_3, headers=headers)
          audio_data = response_3.text
          data = json.loads(audio_data)
          byte_data = data['audios'][0]
          audio_bytes = base64.b64decode(byte_data)

          return audio_bytes
      except Exception as e:
          x = str(e)
          print('Error: <<Process Chunk>>: ', x)
          audio_bytes = base64.b64decode('')

          return audio_bytes

Purpose:

This asynchronous method processes a chunk of text to generate audio using an external API.

What it Does:

  • Cleans up the text chunk by removing unwanted characters.
  • Prepares a payload with the cleaned text and other parameters required for text-to-speech conversion.
  • Sends a POST request to an external API to generate audio from the text.
  • Decodes the audio data received from the API (in base64 format) into raw audio bytes.
  • Returns the audio bytes or an empty byte string if there is an error.

The processChunk method takes a text, sends it to an external service to be converted into speech, and returns the audio data. If something goes wrong, it prints an error message.


  async def processAudio(self, audio):
      try:
          model_2 = self.model_2
          model_3 = self.model_3
          url_1 = self.url_1
          url_2 = self.url_2
          url_3 = self.url_3
          sarvamAPIKey = self.sarvamAPIKey
          audioFile = self.audioFile
          WavFile = self.WavFile
          langCode_1 = self.langCode_1
          langCode_2 = self.langCode_2
          speakerGender = self.speakerGender

          headers = {
              "api-subscription-key": sarvamAPIKey
          }

          audio_queue = Queue()
          data = {
              "model": model_2,
              "prompt": templateVal_1
          }
          files = {
              "file": (audioFile, open(WavFile, "rb"), "audio/wav")
          }

          response_1 = requests.post(url_1, headers=headers, data=data, files=files)
          tempDert = json.loads(response_1.text)
          regionalT = tempDert['transcript']
          langCd = tempDert['language_code']
          statusCd = response_1.status_code
          payload_2 = {
              "input": regionalT,
              "source_language_code": langCode_2,
              "target_language_code": langCode_1,
              "speaker_gender": speakerGender,
              "mode": "formal",
              "model": model_3,
              "enable_preprocessing": True
          }

          response_2 = requests.request("POST", url_2, json=payload_2, headers=headers)
          regionalT_2 = response_2.text
          data_ = json.loads(regionalT_2)
          regionalText = data_['translated_text']
          chunked_response = self.chunkBengaliResponse(regionalText)

          audio_thread = Thread(target=self.audioPlayerWorker, args=(audio_queue,))
          audio_thread.start()

          for chText in chunked_response:
              audio_bytes = await self.processChunk(chText, url_3, headers)
              audio_queue.put(audio_bytes)

          audio_queue.join()
          audio_queue.put(None)
          audio_thread.join()

          var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
          print('Retrieval Time: ', str(var))

          return 0

      except Exception as e:
          x = str(e)
          print('Error: <<Processing Audio>>: ', x)

          return 1

Purpose:

This asynchronous method handles the complete audio processing workflow, including speech recognition, translation, and audio playback.

What it Does:

  • Initializes various configurations and headers required for processing.
  • Sends the recorded audio to an API to get the transcript and detected language.
  • Translates the transcript into another language using another API.
  • Splits the translated text into smaller chunks using the chunkBengaliResponse method.
  • Starts an audio playback thread to play each processed audio chunk.
  • Sends each text chunk to the processChunk method to convert to speech and adds the audio data to the queue for playback.
  • Waits for all audio chunks to be processed and played before finishing.
  • Logs the current time when the process is complete.
  • Returns 0 on success or 1 if there was an error.

The “processAudio”-method takes recorded audio, recognizes what was said, translates it into another language, splits the translated text into parts, converts each part into speech, and plays it back. It uses different services to do this; if there’s a problem at any step, it prints an error message.

And, here is the performance stats (Captured from Sarvam AI website) –


So, finally, we’ve done it. You can view the complete code in this GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse.

Till then, Happy Avenging! 🙂

Building the optimized Indic Language bot by using the Python-based Sarvam AI LLMs – Part 1

In the rapidly evolving landscape of artificial intelligence, Sarvam AI has emerged as a pioneering force in developing language technologies for Indian languages. This article series aims to provide an in-depth look at Sarvam AI’s Indic APIs, exploring their features, performance, and potential impact on the Indian tech ecosystem.

This LLM aims to bridge the language divide in India’s digital landscape by providing powerful, accessible AI tools for Indic languages.

India has 22 official languages and hundreds of dialects, presenting a unique challenge for technology adoption and digital inclusion. Even though all the government work happens in both the official language along with English language.

Developers can fine-tune the models for specific domains or use cases, improving accuracy for specialized applications.

As of 2024, Sarvam AI’s Indic APIs support the following languages:

  • Hindi
  • Bengali
  • Tamil
  • Telugu
  • Marathi
  • Gujarati
  • Kannada
  • Malayalam
  • Punjabi
  • Odia

Before delving into the details, I strongly recommend taking a look at the demo.

Isn’t this exciting? Let us understand the flow of events in the following diagram –

The application interacts with Sarvam AI’s API. After interpreting the initial audio inputs from the computer, it uses Sarvam AI’s API to get the answer based on the selected Indic language, Bengali.

pip install SpeechRecognition==3.10.4
pip install pydub==0.25.1
pip install sounddevice==0.5.0
pip install numpy==1.26.4
pip install soundfile==0.12.1

clsSarvamAI.py (This script will capture the audio input in Indic languages & then provide an LLM response in the form of audio in Indic languages. In this post, we’ll discuss part of the code. In the next part, we’ll be discussing the next important methods. Note that we’re only going to discuss a few important functions here.)

def initializeMicrophone(self):
      try:
          for index, name in enumerate(sr.Microphone.list_microphone_names()):
              print(f"Microphone with name \"{name}\" found (device_index={index})")
          return sr.Microphone()
      except Exception as e:
          x = str(e)
          print('Error: <<Initiating Microphone>>: ', x)

          return ''

  def realTimeTranslation(self):
      try:
          WavFile = self.WavFile
          recognizer = sr.Recognizer()
          try:
              microphone = self.initializeMicrophone()
          except Exception as e:
              print(f"Error initializing microphone: {e}")
              return

          with microphone as source:
              print("Adjusting for ambient noise. Please wait...")
              recognizer.adjust_for_ambient_noise(source, duration=5)
              print("Microphone initialized. Start speaking...")

              try:
                  while True:
                      try:
                          print("Listening...")
                          audio = recognizer.listen(source, timeout=5, phrase_time_limit=5)
                          print("Audio captured. Recognizing...")

                          #var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
                          #print('Before Audio Time: ', str(var))

                          self.createWavFile(audio, WavFile)

                          try:
                              text = recognizer.recognize_google(audio, language="bn-BD")  # Bengali language code
                              sentences = text.split('')  # Bengali full stop

                              print('Sentences: ')
                              print(sentences)
                              print('*'*120)

                              if not text:
                                  print("No speech detected. Please try again.")
                                  continue

                              if str(text).lower() == 'টাটা':
                                  raise BreakOuterLoop("Based on User Choice!")

                              asyncio.run(self.processAudio(audio))

                          except sr.UnknownValueError:
                              print("Google Speech Recognition could not understand audio")
                          except sr.RequestError as e:
                              print(f"Could not request results from Google Speech Recognition service; {e}")

                      except sr.WaitTimeoutError:
                          print("No speech detected within the timeout period. Listening again...")
                      except BreakOuterLoop:
                          raise
                      except Exception as e:
                          print(f"An unexpected error occurred: {e}")

                      time.sleep(1)  # Short pause before next iteration

              except BreakOuterLoop as e:
                  print(f"Exited : {e}")

          # Removing the temporary audio file that was generated at the begining
          os.remove(WavFile)

          return 0
      except Exception as e:
          x = str(e)
          print('Error: <<Real-time Translation>>: ', x)

          return 1

Purpose:

This method is responsible for setting up and initializing the microphone for audio input.

What it Does:

  • It attempts to list all available microphones connected to the system.
  • It prints the microphone’s name and corresponding device index (a unique identifier) for each microphone.
  • If successful, it returns a microphone object (sr.Microphone()), which can be used later to capture audio.
  • If this process encounters an error (e.g., no microphones being found or an internal error), it catches the exception, prints an error message, and returns an empty string (“).

The “initializeMicrophone” Method finds all microphones connected to the computer and prints their names. If it finds a microphone, it prepares to use it for recording. If something goes wrong, it tells you what went wrong and stops the process.

Purpose:
This Method uses the microphone to handle real-time speech translation from a user. It captures spoken audio, converts it into text, and processes it further.

What it Does:

  • Initializes a recognizer object (sr.Recognizer()) for speech recognition.
  • Call initializeMicrophone to set up the microphone. If initialization fails, an error message is printed, and the process is stopped.
  • Once the microphone is set up successfully, it adjusts for ambient noise to enhance accuracy.
  • Enters a loop to continuously listen for audio input from the user:
    • It waits for the user to speak and captures the audio.
    • Converts the captured audio to text using Google’s Speech Recognition service, specifying Bengali as the language.
    • If text is successfully captured and recognized:
      • Splits the text into sentences using the Bengali full-stop character.
      • Prints the sentences.
      • It checks if the text is a specific word (“টাটা”), and if so, it raises an exception to stop the loop (indicating that the user wants to exit).
      • Otherwise, it processes the audio asynchronously with processAudio.
    • If no speech is detected or an error occurs, it prints the relevant message and continues listening.
  • If the user decides to exit or if an error occurs, it breaks out of the loop, deletes any temporary audio files created, and returns a status code (0 for success, 1 for failure).


The “realTimeTranslation” method continuously listens to the microphone for the user to speak. It captures what is said and tries to understand it using Google’s service, specifically for the Bengali language. It then splits what was said into sentences and prints them out. If the user says “টাটা” (which means “goodbye” in Bengali), it stops listening and exits. If it cannot understand the user or if there is a problem, it will let the user know and try again. It will print an error and stop the process if something goes wrong.


Let’s wait for the next part & enjoy this part.

Enabling & Exploring Stable Defussion – Part 1

This new solution will evaluate the power of Stable Defussion, which is created solutions as we progress & refine our prompt from scratch by using Stable Defussion & Python. This post opens new opportunities for IT companies & business start-ups looking to deliver solutions & have better performance compared to the paid version of Stable Defussion AI’s API performance. This project is for the advanced Python, Stable Defussion for data Science Newbies & AI evangelists.

In a series of posts, I’ll explain and focus on the Stable Defussion API and custom solution using the Python-based SDK of Stable Defussion.

But, before that, let us view the video that it generates from the prompt by using the third-party API:

Prompt to Video

And, let us understand the prompt that we supplied to create the above video –

Isn’t it exciting?

However, I want to stress this point: the video generated by the Stable Defusion (Stability AI) API was able to partially apply the animation effect. Even though the animation applies to the cloud, It doesn’t apply the animation to the wave. But, I must admit, the quality of the video is quite good.


Let us understand the code and how we run the solution, and then we can try to understand its performance along with the other solutions later in the subsequent series.

As you know, we’re exploring the code base of the third-party API, which will actually execute a series of API calls that create a video out of the prompt.

Let us understand some of the important snippet –

class clsStabilityAIAPI:
    def __init__(self, STABLE_DIFF_API_KEY, OUT_DIR_PATH, FILE_NM, VID_FILE_NM):
        self.STABLE_DIFF_API_KEY = STABLE_DIFF_API_KEY
        self.OUT_DIR_PATH = OUT_DIR_PATH
        self.FILE_NM = FILE_NM
        self.VID_FILE_NM = VID_FILE_NM

    def delFile(self, fileName):
        try:
            # Deleting the intermediate image
            os.remove(fileName)

            return 0 
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

    def generateText2Image(self, inputDescription):
        try:
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
            fullFileName = self.OUT_DIR_PATH + self.FILE_NM
            
            if STABLE_DIFF_API_KEY is None:
                raise Exception("Missing Stability API key.")
            
            response = requests.post(f"{api_host}/v1/generation/{engine_id}/text-to-image",
                                    headers={
                                        "Content-Type": "application/json",
                                        "Accept": "application/json",
                                        "Authorization": f"Bearer {STABLE_DIFF_API_KEY}"
                                        },
                                        json={
                                            "text_prompts": [{"text": inputDescription}],
                                            "cfg_scale": 7,
                                            "height": 1024,
                                            "width": 576,
                                            "samples": 1,
                                            "steps": 30,
                                            },)
            
            if response.status_code != 200:
                raise Exception("Non-200 response: " + str(response.text))
            
            data = response.json()

            for i, image in enumerate(data["artifacts"]):
                with open(fullFileName, "wb") as f:
                    f.write(base64.b64decode(image["base64"]))      
            
            return fullFileName

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 'N/A'

    def image2VideoPassOne(self, imgNameWithPath):
        try:
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY

            response = requests.post(f"https://api.stability.ai/v2beta/image-to-video",
                                    headers={"authorization": f"Bearer {STABLE_DIFF_API_KEY}"},
                                    files={"image": open(imgNameWithPath, "rb")},
                                    data={"seed": 0,"cfg_scale": 1.8,"motion_bucket_id": 127},
                                    )
            
            print('First Pass Response:')
            print(str(response.text))
            
            genID = response.json().get('id')

            return genID 
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 'N/A'

    def image2VideoPassTwo(self, genId):
        try:
            generation_id = genId
            STABLE_DIFF_API_KEY = self.STABLE_DIFF_API_KEY
            fullVideoFileName = self.OUT_DIR_PATH + self.VID_FILE_NM

            response = requests.request("GET", f"https://api.stability.ai/v2beta/image-to-video/result/{generation_id}",
                                        headers={
                                            'accept': "video/*",  # Use 'application/json' to receive base64 encoded JSON
                                            'authorization': f"Bearer {STABLE_DIFF_API_KEY}"
                                            },) 
            
            print('Retrieve Status Code: ', str(response.status_code))
            
            if response.status_code == 202:
                print("Generation in-progress, try again in 10 seconds.")

                return 5
            elif response.status_code == 200:
                print("Generation complete!")
                with open(fullVideoFileName, 'wb') as file:
                    file.write(response.content)

                print("Successfully Retrieved the video file!")

                return 0
            else:
                raise Exception(str(response.json()))
            
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

Now, let us understand the code –

This function is called when an object of the class is created. It initializes four properties:

  • STABLE_DIFF_API_KEY: the API key for Stability AI services.
  • OUT_DIR_PATH: the folder path to save files.
  • FILE_NM: the name of the generated image file.
  • VID_FILE_NM: the name of the generated video file.

This function deletes a file specified by fileName.

  • If successful, it returns 0.
  • If an error occurs, it logs the error and returns 1.

This function generates an image based on a text description:

  • Sends a request to the Stability AI text-to-image endpoint using the API key.
  • Saves the resulting image to a file.
  • Returns the file’s path on success or 'N/A' if an error occurs.

This function uploads an image to create a video in its first phase:

  • Sends the image to Stability AI’s image-to-video endpoint.
  • Logs the response and extracts the id (generation ID) for the next phase.
  • Returns the id if successful or 'N/A' on failure.

This function retrieves the video created in the second phase using the genId:

  • Checks the video generation status from the Stability AI endpoint.
  • If complete, saves the video file and returns 0.
  • If still processing, returns 5.
  • Logs and returns 1 for any errors.

As you can see, the code is pretty simple to understand & we’ve taken all the necessary actions in case of any unforeseen network issues or even if the video is not ready after our job submission in the following lines of the main calling script (generateText2VideoAPI.py) –

waitTime = 10
time.sleep(waitTime)

# Failed case retry
retries = 1
success = False

try:
    while not success:
        try:
            z = r1.image2VideoPassTwo(gID)
        except Exception as e:
            success = False

        if z == 0:
            success = True
        else:
            wait = retries * 2 * 15
            str_R1 = "retries Fail! Waiting " + str(wait) + " seconds and retrying!"

            print(str_R1)

            time.sleep(wait)
            retries += 1

        # Checking maximum retries
        if retries >= maxRetryNo:
            success = True
            raise  Exception
except:
    print()

And, let us see how the run looks like –

Let us understand the CPU utilization –

As you can see, CPU utilization is minimal since most tasks are at the API end.


So, we’ve done it. 🙂

Please find the next series on this topic below:

Enabling & Exploring Stable Defussion – Part 2

Enabling & Exploring Stable Defussion – Part 3

Please let me know your feedback after reviewing all the posts! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 2)

Today, we’ll share the second installment of the RAG implementation. If you are new here, please visit the previous post for full context.

In this post, we’ll be discussing the Haystack framework more. Again, before discussing the main context, I want to present the demo here.

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process, where today, we’ll pay our primary attention.

As you can see today, we’ll discuss the red dotted line, which contextualizes the source data into the Vector DBs.

Let us understand the flow of events here –

  1. The main Python application will consume the nested JSON by invoking the museum API in multiple threads.
  2. The application will clean the nested data & extract the relevant attributes after flattening the JSON.
  3. It will create the unstructured text-based context, which is later fed to the Vector DB framework.

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’re using the Metropolitan Museum API to feed the data to our Vector DB. For more information, please visit the following link. And this is free to use & moreover, we’re using it for education scenarios.


We’ll discuss the tokenization part highlighted in a red dotted line from the above picture.

We’ll discuss the scripts in the diagram as part of the flow mentioned above.

  • clsExtractJSON.py (This is the main class that will extract the content from the museum API using parallel calls.)
def genData(self):
    try:
        base_url = self.base_url
        header_token = self.header_token
        basePath = self.basePath
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        subdir = self.subdir
        Ind = self.Ind
        var_1 = datetime.now().strftime("%H.%M.%S")


        devVal = list()
        objVal = list()

        # Main Details
        headers = {'Cookie':header_token}
        payload={}

        url = base_url + '/departments'

        date_ranges = self.generateFirstDayOfLastTenYears()

        # Getting all the departments
        try:
            print('Department URL:')
            print(str(url))

            response = requests.request("GET", url, headers=headers, data=payload)
            parsed_data = json.loads(response.text)

            print('Department JSON:')
            print(str(parsed_data))

            # Extract the "departmentId" values into a Python list
            for dept_det in parsed_data['departments']:
                for info in dept_det:
                    if info == 'departmentId':
                        devVal.append(dept_det[info])

        except Exception as e:
            x = str(e)
            print('Error: ', x)
            devVal = list()

        # List to hold thread objects
        threads = []

        # Calling the Data using threads
        for dep in devVal:
            t = threading.Thread(target=self.getDataThread, args=(dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind,))
            threads.append(t)
            t.start()

        # Wait for all threads to complete
        for t in threads:
            t.join()

        res = self.mergeCsvFilesInDirectory(basePath, outputPath, mergedFile)

        if res == 0:
            print('Successful!')
        else:
            print('Failure!')

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above code translates into the following steps –

  1. The above method first calls the generateFirstDayOfLastTenYears() plan to populate records for every department after getting all the unique departments by calling another API.
  2. Then, it will call the getDataThread() methods to fetch all the relevant APIs simultaneously to reduce the overall wait time & create individual smaller files.
  3. Finally, the application will invoke the mergeCsvFilesInDirectory() method to merge all the chunk files into one extensive historical data.
def generateFirstDayOfLastTenYears(self):
    yearRange = self.yearRange
    date_format = "%Y-%m-%d"
    current_year = datetime.now().year

    date_ranges = []
    for year in range(current_year - yearRange, current_year + 1):
        first_day_of_year_full = datetime(year, 1, 1)
        first_day_of_year = first_day_of_year_full.strftime(date_format)
        date_ranges.append(first_day_of_year)

    return date_ranges

The first method will generate the first day of each year for the last ten years, including the current year.

def getDataThread(self, dep, base_url, headers, payload, date_ranges, objVal, subdir, Ind):
    try:
        cnt = 0
        cnt_x = 1
        var_1 = datetime.now().strftime("%H.%M.%S")

        for x_start_date in date_ranges:
            try:
                urlM = base_url + '/objects?metadataDate=' + str(x_start_date) + '&departmentIds=' + str(dep)

                print('Nested URL:')
                print(str(urlM))

                response_obj = requests.request("GET", urlM, headers=headers, data=payload)
                objectDets = json.loads(response_obj.text)

                for obj_det in objectDets['objectIDs']:
                    objVal.append(obj_det)

                for objId in objVal:
                    urlS = base_url + '/objects/' + str(objId)

                    print('Final URL:')
                    print(str(urlS))

                    response_det = requests.request("GET", urlS, headers=headers, data=payload)
                    objDetJSON = response_det.text

                    retDB = self.createData(objDetJSON)
                    retDB['departmentId'] = str(dep)

                    if cnt == 0:
                        df_M = retDB
                    else:
                        d_frames = [df_M, retDB]
                        df_M = pd.concat(d_frames)

                    if cnt == 1000:
                        cnt = 0
                        clog.logr('df_M_' + var_1 + '_' + str(cnt_x) + '_' + str(dep) +'.csv', Ind, df_M, subdir)
                        cnt_x += 1
                        df_M = pd.DataFrame()

                    cnt += 1

            except Exception as e:
                x = str(e)
                print('Error X:', x)
        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

The above method will invoke the individual API call to fetch the relevant artifact information.

def mergeCsvFilesInDirectory(self, directory_path, output_path, output_file):
    try:
        csv_files = [file for file in os.listdir(directory_path) if file.endswith('.csv')]
        data_frames = []

        for file in csv_files:
            encodings_to_try = ['utf-8', 'utf-8-sig', 'latin-1', 'cp1252']
            for encoding in encodings_to_try:
                try:
                    FullFileName = directory_path + file
                    print('File Name: ', FullFileName)
                    df = pd.read_csv(FullFileName, encoding=encoding)
                    data_frames.append(df)
                    break  # Stop trying other encodings if the reading is successful
                except UnicodeDecodeError:
                    continue

        if not data_frames:
            raise Exception("Unable to read CSV files. Check encoding or file format.")

        merged_df = pd.concat(data_frames, ignore_index=True)

        merged_full_name = os.path.join(output_path, output_file)
        merged_df.to_csv(merged_full_name, index=False)

        for file in csv_files:
            os.remove(os.path.join(directory_path, file))

        return 0

    except Exception as e:
        x = str(e)
        print('Error: ', x)
        return 1

The above method will merge all the small files into a single, more extensive historical data that contains over ten years of data (the first day of ten years of data, to be precise).

For the complete code, please visit the GitHub.

  • 1_ReadMuseumJSON.py (This is the main class that will invoke the class, which will extract the content from the museum API using parallel calls.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################
import datetime
from clsConfigClient import clsConfigClient as cf

import clsExtractJSON as cej

########################################################
################    Global Area   ######################
########################################################

cJSON = cej.clsExtractJSON()

basePath = cf.conf['DATA_PATH']
outputPath = cf.conf['OUTPUT_PATH']
mergedFile = cf.conf['MERGED_FILE']

########################################################
################  End Of Global Area   #################
########################################################

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

def main():
    try:
        var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        r1 = cJSON.genData()

        if r1 == 0:
            print()
            print('Successfully Scrapped!')
        else:
            print()
            print('Failed to Scrappe!')

        print('*'*120)
        var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script calls the main class after instantiating the class.

  • clsCreateList.py (This is the main class that will extract the relevant attributes from the historical files & then create the right input text to create the documents for contextualize into the Vector DB framework.)
def createRec(self):
    try:
        basePath = self.basePath
        fileName = self.fileName
        Ind = self.Ind
        subdir = self.subdir
        base_url = self.base_url
        outputPath = self.outputPath
        mergedFile = self.mergedFile
        cleanedFile = self.cleanedFile

        FullFileName = outputPath + mergedFile

        df = pd.read_csv(FullFileName)
        df2 = df[listCol]
        dfFin = df2.drop_duplicates().reset_index(drop=True)

        dfFin['artist_URL'] = dfFin['artistWikidata_URL'].combine_first(dfFin['artistULAN_URL'])
        dfFin['object_URL'] = dfFin['objectURL'].combine_first(dfFin['objectWikidata_URL'])
        dfFin['Wiki_URL'] = dfFin['Wikidata_URL'].combine_first(dfFin['AAT_URL']).combine_first(dfFin['URL']).combine_first(dfFin['object_URL'])

        # Dropping the old Dtype Columns
        dfFin.drop(['artistWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['artistULAN_URL'], axis=1, inplace=True)
        dfFin.drop(['objectURL'], axis=1, inplace=True)
        dfFin.drop(['objectWikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['AAT_URL'], axis=1, inplace=True)
        dfFin.drop(['Wikidata_URL'], axis=1, inplace=True)
        dfFin.drop(['URL'], axis=1, inplace=True)

        # Save the filtered DataFrame to a new CSV file
        #clog.logr(cleanedFile, Ind, dfFin, subdir)
        res = self.addHash(dfFin)

        if res == 0:
            print('Added Hash!')
        else:
            print('Failed to add hash!')

        # Generate the text for each row in the dataframe
        for _, row in dfFin.iterrows():
            x = self.genPrompt(row)
            self.addDocument(x, cleanedFile)

        return documents

    except Exception as e:
        x = str(e)
        print('Record Error: ', x)

        return documents

The above code will read the data from the extensive historical file created from the earlier steps & then it will clean the file by removing all the duplicate records (if any) & finally, it will create three unique URLs that constitute artist, object & wiki.

Also, this application will remove the hyperlink with a specific hash value, which will feed into the vector DB. Vector DB could be better with the URLs. Hence, we will store the URLs in a separate file by storing the associate hash value & later, we’ll fetch it in a lookup from the open AI response.

Then, this application will generate prompts dynamically & finally create the documents for later steps of vector DB consumption by invoking the addDocument() methods.

For more details, please visit the GitHub link.

  • 1_1_testCreateRec.py (This is the main class that will call the above class.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsCreateList as ccl

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

documents = []

###############################################
###    End of Global Section                ###
###############################################
def main():
    try:
        var = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('*'*120)
        print('Start Time: ' + str(var))
        print('*'*120)

        print('*'*240)
        print('Creating Index store:: ')
        print('*'*240)

        documents = cl.createRec()

        print('Inserted Sample Records: ')
        print(str(documents))
        print('\n')

        r1 = len(documents)

        if r1 > 0:
            print()
            print('Successfully Indexed sample records!')
        else:
            print()
            print('Failed to sample Indexed recrods!')

        print('*'*120)
        var1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        print('End Time: ' + str(var1))

    except Exception as e:
        x = str(e)
        print('Error: ', x)

if __name__ == '__main__':
    main()

The above script invokes the main class after instantiating it & invokes the createRec() methods to tokenize the data into the vector DB.

This above test script will be used to test the above clsCreateList class. However, the class will be used inside another class.

– Satyaki
  • clsFeedVectorDB.py (This is the main class that will feed the documents into the vector DB.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Sep-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### haystack frameowrk to contextulioze the docs    ####
#### inside the vector DB.                           ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai
import pandas as pd
import os
import clsCreateList as ccl

from clsConfigClient import clsConfigClient as cf
import clsL as log

from datetime import datetime, timedelta

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###           Global Section                ###
###############################################

Ind = cf.conf['DEBUG_IND']
openAIKey = cf.conf['OPEN_AI_KEY']

os.environ["TOKENIZERS_PARALLELISM"] = "false"

#Initiating Logging Instances
clog = log.clsL()
cl = ccl.clsCreateList()

var = datetime.now().strftime(".%H.%M.%S")

# Encode your data to create embeddings
documents = []

var_1 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*'*120)
print('Start Time: ' + str(var_1))
print('*'*120)

print('*'*240)
print('Creating Index store:: ')
print('*'*240)

documents = cl.createRec()

print('Inserted Sample Records: ')
print(documents[:5])
print('\n')
print('Type:')
print(type(documents))

r1 = len(documents)

if r1 > 0:
    print()
    print('Successfully Indexed records!')
else:
    print()
    print('Failed to Indexed recrods!')

print('*'*120)
var_2 = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('End Time: ' + str(var_2))

# Passing OpenAI API Key
openai.api_key = openAIKey

###############################################
###    End of Global Section                ###
###############################################

class clsFeedVectorDB:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.modelFileName = cf.conf['CACHE_FILE']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.vectorDBFileName = cf.conf['VECTORDB_FILE_NM']
        self.queryModel = cf.conf['QUERY_MODEL']
        self.passageModel = cf.conf['PASSAGE_MODEL']

    def retrieveDocuments(self, question, retriever, top_k=3):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrievedDocs, question):
        documents_text = " ".join([doc.content for doc in retrievedDocs])
        prompt = f"Given the following documents: {documents_text}, answer the question: {question}"

        response = openai.Completion.create(
            model="text-davinci-003",
            prompt=prompt,
            max_tokens=150
        )
        return response.choices[0].text.strip()

    def ragAnswerWithHaystackAndGPT3(self, question, retriever):
        retrievedDocs = self.retrieveDocuments(question, retriever)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def genData(self, strVal):
        try:
            basePath = self.basePath
            modelFileName = self.modelFileName
            vectorDBPath = self.vectorDBPath
            vectorDBFileName = self.vectorDBFileName
            queryModel = self.queryModel
            passageModel = self.passageModel

            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            FullFileName = basePath + modelFileName
            FullVectorDBname = vectorDBPath + vectorDBFileName

            sqlite_path = "sqlite:///" + FullVectorDBname + '.db'
            print('Vector DB Path: ', str(sqlite_path))

            indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
            indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

            print('File: ', str(indexFile))
            print('Config: ', str(indexConfig))

            # Initialize DocumentStore
            document_store = FAISSDocumentStore(sql_url=sqlite_path)

            libName = "vectorDB/" + str(vectorDBFileName) + '.faiss'

            document_store.write_documents(documents)

            # Initialize Retriever
            retriever = DensePassageRetriever(document_store=document_store,
                                              query_embedding_model=queryModel,
                                              passage_embedding_model=passageModel,
                                              use_gpu=False)

            document_store.update_embeddings(retriever=retriever)

            document_store.save(index_path=libName, config_path="vectorDB/" + str(vectorDBFileName) + ".json")

            print('*'*120)
            print('Testing with RAG & OpenAI...')
            print('*'*120)

            answer = self.ragAnswerWithHaystackAndGPT3(strVal, retriever)

            print('*'*120)
            print('Testing Answer:: ')
            print(answer)
            print('*'*120)

            return 0

        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In the above script, the following essential steps took place –

  1. First, the application calls the clsCreateList class to store all the documents inside a dictionary.
  2. Then it stores the data inside the vector DB & creates & stores the model, which will be later reused (If you remember, we’ve used this as a model in our previous post).
  3. Finally, test with some sample use cases by providing the proper context to OpenAI & confirm the response.

Here is a short clip of how the RAG models contextualize with the source data.

RAG-Model Contextualization

So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

RAG implementation of LLMs by using Python, Haystack & React (Part – 1)

Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.

In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.

Before I explain the process to invoke this new library, why not view the demo first & then discuss it?

Demo

Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.

As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.

Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).

Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).

The following are the important packages that are essential to this project –

pip install farm-haystack==1.19.0
pip install Flask==2.2.5
pip install Flask-Cors==4.0.0
pip install Flask-JWT-Extended==4.5.2
pip install Flask-Session==0.5.0
pip install openai==0.27.8
pip install pandas==2.0.3
pip install tensorflow==2.11.1

We’ve both the front-end using react & back-end APIs with Python-flask and the Open AI to create this experience.

Today, we’ll be going in reverse mode. We first discuss the main script & then explain all the other class scripts.

  • flaskServer.py (This is the main calling Python script to invoke the RAG-Server.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from flask import Flask, jsonify, request, session
from flask_cors import CORS
from werkzeug.security import check_password_hash, generate_password_hash
from flask_jwt_extended import JWTManager, jwt_required, create_access_token
import pandas as pd
from clsConfigClient import clsConfigClient as cf
import clsL as log
import clsContentScrapper as csc
import clsRAGOpenAI as crao
import csv
from datetime import timedelta
import os
import re
import json

########################################################
################    Global Area   ######################
########################################################
#Initiating Logging Instances
clog = log.clsL()

admin_key = cf.conf['ADMIN_KEY']
secret_key = cf.conf['SECRET_KEY']
session_path = cf.conf['SESSION_PATH']
sessionFile = cf.conf['SESSION_CACHE_FILE']

app = Flask(__name__)
CORS(app)  # This will enable CORS for all routes
app.config['JWT_SECRET_KEY'] = admin_key  # Change this!
app.secret_key = secret_key

jwt = JWTManager(app)

users = cf.conf['USER_NM']
passwd = cf.conf['USER_PWD']

cCScrapper = csc.clsContentScrapper()
cr = crao.clsRAGOpenAI()

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

# Define the aggregation functions
def join_unique(series):
    unique_vals = series.drop_duplicates().astype(str)
    return ', '.join(filter(lambda x: x != 'nan', unique_vals))

# Building the preaggregate cache
def groupImageWiki():
    try:
        base_path = cf.conf['OUTPUT_PATH']
        inputFile = cf.conf['CLEANED_FILE']
        outputFile = cf.conf['CLEANED_FILE_SHORT']
        subdir = cf.conf['SUBDIR_OUT']
        Ind = cf.conf['DEBUG_IND']

        inputCleanedFileLookUp = base_path + inputFile

        #Opening the file in dataframe
        df = pd.read_csv(inputCleanedFileLookUp)
        hash_values = df['Total_Hash'].unique()

        dFin = df[['primaryImage','Wiki_URL','Total_Hash']]

        # Ensure columns are strings and not NaN
        # Convert columns to string and replace 'nan' with an empty string
        dFin['primaryImage'] = dFin['primaryImage'].astype(str).replace('nan', '')
        dFin['Wiki_URL'] = dFin['Wiki_URL'].astype(str).replace('nan', '')

        dFin.drop_duplicates()

        # Group by 'Total_Hash' and aggregate
        dfAgg = dFin.groupby('Total_Hash').agg({'primaryImage': join_unique,'Wiki_URL': join_unique}).reset_index()

        return dfAgg

    except Exception as e:
        x = str(e)
        print('Error: ', x)

        df = pd.DataFrame()

        return df

resDf = groupImageWiki()

########################################################
################  End  Global Area  ####################
########################################################

def extractRemoveUrls(hash_value):
    image_urls = ''
    wiki_urls = ''
    # Parse the inner message JSON string
    try:

        resDf['Total_Hash'] = resDf['Total_Hash'].astype(int)
        filtered_df = resDf[resDf['Total_Hash'] == int(hash_value)]

        if not filtered_df.empty:
            image_urls = filtered_df['primaryImage'].values[0]
            wiki_urls = filtered_df['Wiki_URL'].values[0]

        return image_urls, wiki_urls

    except Exception as e:
        x = str(e)
        print('extractRemoveUrls Error: ', x)
        return image_urls, wiki_urls

def isIncomplete(line):
    """Check if a line appears to be incomplete."""

    # Check if the line ends with certain patterns indicating it might be incomplete.
    incomplete_patterns = [': [Link](', ': Approximately ', ': ']
    return any(line.endswith(pattern) for pattern in incomplete_patterns)

def filterData(data):
    """Return only the complete lines from the data."""

    lines = data.split('\n')
    complete_lines = [line for line in lines if not isIncomplete(line)]

    return '\n'.join(complete_lines)

def updateCounter(sessionFile):
    try:
        counter = 0

        # Check if the CSV file exists
        if os.path.exists(sessionFile):
            with open(sessionFile, 'r') as f:
                reader = csv.reader(f)
                for row in reader:
                    # Assuming the counter is the first value in the CSV
                    counter = int(row[0])

        # Increment counter
        counter += 1

        # Write counter back to CSV
        with open(sessionFile, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([counter])

        return counter
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

def getPreviousResult():
    try:
        fullFileName = session_path + sessionFile
        newCounterValue = updateCounter(fullFileName)

        return newCounterValue
    except Exception as e:
        x = str(e)
        print('Error: ', x)

        return 1

@app.route('/login', methods=['POST'])
def login():
    username = request.json.get('username', None)
    password = request.json.get('password', None)

    print('User Name: ', str(username))
    print('Password: ', str(password))

    #if username not in users or not check_password_hash(users.get(username), password):
    if ((username not in users) or (password not in passwd)):
        return jsonify({'login': False}), 401

    access_token = create_access_token(identity=username)
    return jsonify(access_token=access_token)

@app.route('/chat', methods=['POST'])
def get_chat():
    try:
        #session["key"] = "1D98KI"
        #session_id = session.sid
        #print('Session Id: ', str(session_id))

        cnt = getPreviousResult()
        print('Running Session Count: ', str(cnt))

        username = request.json.get('username', None)
        message = request.json.get('message', None)

        print('User: ', str(username))
        print('Content: ', str(message))

        if cnt == 1:
            retList = cCScrapper.extractCatalog()
        else:
            hashValue, cleanedData = cr.getData(str(message))
            print('Main Hash Value:', str(hashValue))

            imageUrls, wikiUrls = extractRemoveUrls(hashValue)
            print('Image URLs: ', str(imageUrls))
            print('Wiki URLs: ', str(wikiUrls))
            print('Clean Text:')
            print(str(cleanedData))
            retList = '{"records":[{"Id":"' + str(cleanedData) + '", "Image":"' + str(imageUrls) + '", "Wiki": "' + str(wikiUrls) + '"}]}'

        response = {
            'message': retList
        }

        print('JSON: ', str(response))
        return jsonify(response)

    except Exception as e:
        x = str(e)

        response = {
            'message': 'Error: ' + x
        }
        return jsonify(response)

@app.route('/api/data', methods=['GET'])
@jwt_required()
def get_data():
    response = {
        'message': 'Hello from Flask!'
    }
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

Let us understand some of the important sections of the above script –

Function – login():

The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.

Function – get_chat():

The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.

Function – updateCounter():

The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.

Function – extractRemoveUrls():

The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.

  • clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
#####################################################
#### Written By: SATYAKI DE                      ####
#### Written On: 27-May-2023                     ####
#### Modified On 28-May-2023                     ####
####                                             ####
#### Objective: This is the main calling         ####
#### python class that will invoke the           ####
#### LangChain of package to extract             ####
#### the transcript from the YouTube videos &    ####
#### then answer the questions based on the      ####
#### topics selected by the users.               ####
####                                             ####
#####################################################

from langchain.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.chat_models import ChatOpenAI
from langchain.chains import LLMChain

from langchain.prompts.chat import (
    ChatPromptTemplate,
    SystemMessagePromptTemplate,
    HumanMessagePromptTemplate,
)

from googleapiclient.discovery import build

import clsTemplate as ct
from clsConfigClient import clsConfigClient as cf

import os

from flask import jsonify
import requests

###############################################
###           Global Section                ###
###############################################
open_ai_Key = cf.conf['OPEN_AI_KEY']
os.environ["OPENAI_API_KEY"] = open_ai_Key
embeddings = OpenAIEmbeddings(openai_api_key=open_ai_Key)

YouTube_Key = cf.conf['YOUTUBE_KEY']
youtube = build('youtube', 'v3', developerKey=YouTube_Key)

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

###############################################
###    End of Global Section                ###
###############################################

class clsContentScrapper:
    def __init__(self):
        self.model_name = cf.conf['MODEL_NAME']
        self.temp_val = cf.conf['TEMP_VAL']
        self.max_cnt = int(cf.conf['MAX_CNT'])
        self.url = cf.conf['BASE_URL']
        self.header_token = cf.conf['HEADER_TOKEN']

    def extractCatalog(self):
        try:
            base_url = self.url
            header_token = self.header_token

            url = base_url + '/departments'

            print('Full URL: ', str(url))

            payload={}
            headers = {'Cookie': header_token}

            response = requests.request("GET", url, headers=headers, data=payload)

            x = response.text

            return x
        except Exception as e:
            discussedTopic = []
            x = str(e)
            print('Error: ', x)

            return x

Let us understand the the core part that require from this class.

Function – extractCatalog():

The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.

  • clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
#########################################################
#### Written By: SATYAKI DE                          ####
#### Written On: 27-Jun-2023                         ####
#### Modified On 28-Jun-2023                         ####
####                                                 ####
#### Objective: This is the main calling             ####
#### python script that will invoke the              ####
#### shortcut application created inside MAC         ####
#### enviornment including MacBook, IPad or IPhone.  ####
####                                                 ####
#########################################################

from haystack.document_stores.faiss import FAISSDocumentStore
from haystack.nodes import DensePassageRetriever
import openai

from clsConfigClient import clsConfigClient as cf
import clsL as log

# Disbling Warning
def warn(*args, **kwargs):
    pass

import warnings
warnings.warn = warn

import os
import re
###############################################
###           Global Section                ###
###############################################
Ind = cf.conf['DEBUG_IND']
queryModel = cf.conf['QUERY_MODEL']
passageModel = cf.conf['PASSAGE_MODEL']

#Initiating Logging Instances
clog = log.clsL()

os.environ["TOKENIZERS_PARALLELISM"] = "false"

vectorDBFileName = cf.conf['VECTORDB_FILE_NM']

indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'
indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"

print('File: ', str(indexFile))
print('Config: ', str(indexConfig))

# Also, provide `config_path` parameter if you set it when calling the `save()` method:
new_document_store = FAISSDocumentStore.load(index_path=indexFile, config_path=indexConfig)

# Initialize Retriever
retriever = DensePassageRetriever(document_store=new_document_store,
                                  query_embedding_model=queryModel,
                                  passage_embedding_model=passageModel,
                                  use_gpu=False)


###############################################
###    End of Global Section                ###
###############################################

class clsRAGOpenAI:
    def __init__(self):
        self.basePath = cf.conf['DATA_PATH']
        self.fileName = cf.conf['FILE_NAME']
        self.Ind = cf.conf['DEBUG_IND']
        self.subdir = str(cf.conf['OUT_DIR'])
        self.base_url = cf.conf['BASE_URL']
        self.outputPath = cf.conf['OUTPUT_PATH']
        self.vectorDBPath = cf.conf['VECTORDB_PATH']
        self.openAIKey = cf.conf['OPEN_AI_KEY']
        self.temp = cf.conf['TEMP_VAL']
        self.modelName = cf.conf['MODEL_NAME']
        self.maxToken = cf.conf['MAX_TOKEN']

    def extractHash(self, text):
        try:
            # Regular expression pattern to match 'Ref: {' followed by a number and then '}'
            pattern = r"Ref: \{'(\d+)'\}"
            match = re.search(pattern, text)

            if match:
                return match.group(1)
            else:
                return None
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return None

    def removeSentencesWithNaN(self, text):
        try:
            # Split text into sentences using regular expression
            sentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text)
            # Filter out sentences containing 'nan'
            filteredSentences = [sentence for sentence in sentences if 'nan' not in sentence]
            # Rejoin the sentences
            return ' '.join(filteredSentences)
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return ''

    def retrieveDocumentsReader(self, question, top_k=9):
        return retriever.retrieve(question, top_k=top_k)

    def generateAnswerWithGPT3(self, retrieved_docs, question):
        try:
            openai.api_key = self.openAIKey
            temp = self.temp
            modelName = self.modelName
            maxToken = self.maxToken

            documentsText = " ".join([doc.content for doc in retrieved_docs])

            filteredDocs = self.removeSentencesWithNaN(documentsText)
            hashValue = self.extractHash(filteredDocs)

            print('RAG Docs:: ')
            print(filteredDocs)
            #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}"

            # Set up a chat-style prompt with your data
            messages = [
                {"role": "system", "content": "You are a helpful assistant, answer the question accurately based on the above data with the supplied http urls. Only relevant content needs to publish. Please do not provide the facts or the texts that results crossing the max_token limits."},
                {"role": "user", "content": filteredDocs}
            ]

            # Chat style invoking the latest model
            response = openai.ChatCompletion.create(
                model=modelName,
                messages=messages,
                temperature = temp,
                max_tokens=maxToken
            )
            return hashValue, response.choices[0].message['content'].strip().replace('\n','\\n')
        except Exception as e:
            x = str(e)
            print('failed to get from OpenAI: ', x)
            return 'Not Available!'

    def ragAnswerWithHaystackAndGPT3(self, question):
        retrievedDocs = self.retrieveDocumentsReader(question)
        return self.generateAnswerWithGPT3(retrievedDocs, question)

    def getData(self, strVal):
        try:
            print('*'*120)
            print('Index Your Data for Retrieval:')
            print('*'*120)

            print('Response from New Docs: ')
            print()

            hashValue, answer = self.ragAnswerWithHaystackAndGPT3(strVal)

            print('GPT3 Answer::')
            print(answer)
            print('Hash Value:')
            print(str(hashValue))

            print('*'*240)
            print('End Of Use RAG to Generate Answers:')
            print('*'*240)

            return hashValue, answer
        except Exception as e:
            x = str(e)
            print('Error: ', x)
            answer = x
            hashValue = 1

            return hashValue, answer

Let us understand some of the important block –

Function – ragAnswerWithHaystackAndGPT3():

The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.

Function – generateAnswerWithGPT3():

The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.

Function – retrieveDocumentsReader():

The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.

  • App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.js
import React, { useState } from 'react';
import axios from 'axios';
import './App.css';

const App = () => {
  const [isLoggedIn, setIsLoggedIn] = useState(false);
  const [username, setUsername] = useState('');
  const [password, setPassword] = useState('');
  const [message, setMessage] = useState('');
  const [chatLog, setChatLog] = useState([{ sender: 'MuBot', message: 'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);

  const handleLogin = async (e) => {
    e.preventDefault();
    try {
      const response = await axios.post('http://localhost:5000/login', { username, password });
      if (response.status === 200) {
        setIsLoggedIn(true);
      }
    } catch (error) {
      console.error('Login error:', error);
    }
  };

  const sendMessage = async (username) => {
    if (message.trim() === '') return;

    // Create a new chat entry
    const newChatEntry = {
      sender: 'user',
      message: message.trim(),
    };

    // Clear the input field
    setMessage('');

    try {
      // Make API request to Python-based API
      const response = await axios.post('http://localhost:5000/chat', { message: newChatEntry.message }); // Replace with your API endpoint URL
      const responseData = response.data;

      // Print the response to the console for debugging
      console.log('API Response:', responseData);

      // Parse the nested JSON from the 'message' attribute
      const jsonData = JSON.parse(responseData.message);

      // Check if the data contains 'departments'
      if (jsonData.departments) {

        // Extract the 'departments' attribute from the parsed data
        const departments = jsonData.departments;

        // Extract the department names and create a single string with line breaks
        const botResponseText = departments.reduce((acc, department) => {return acc + department.departmentId + ' ' + department.displayName + '\n';}, '');

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message: message }, { sender: 'bot', message: botResponseText },]);
      }
      else if (jsonData.records)
      {
        // Data structure 2: Artwork information
        const records = jsonData.records;

        // Prepare chat entries
        const chatEntries = [];

        // Iterate through records and extract text, image, and wiki information
        records.forEach((record) => {
          const textInfo = Object.entries(record).map(([key, value]) => {
            if (key !== 'Image' && key !== 'Wiki') {
              return `${key}: ${value}`;
            }
            return null;
          }).filter((info) => info !== null).join('\n');

          const imageLink = record.Image;
          //const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));
          //const wikiLinks = record.Wiki;
          const wikiLinks = record.Wiki.split(',').map(link => link.trim());

          console.log('Wiki:', wikiLinks);

          // Check if there is a valid image link
          const hasValidImage = imageLink && imageLink !== '[]';

          const imageElement = hasValidImage ? (
            <img src={imageLink} alt="Artwork" style={{ maxWidth: '100%' }} />
          ) : null;

          // Create JSX elements for rendering the wiki links (if available)
          const wikiElements = wikiLinks.map((link, index) => (
            <div key={index}>
              <a href={link} target="_blank" rel="noopener noreferrer">
                Wiki Link {index + 1}
              </a>
            </div>
          ));

          if (textInfo) {
            chatEntries.push({ sender: 'bot', message: textInfo });
          }

          if (imageElement) {
            chatEntries.push({ sender: 'bot', message: imageElement });
          }

          if (wikiElements.length > 0) {
            chatEntries.push({ sender: 'bot', message: wikiElements });
          }
        });

        // Update the chat log with the bot's response
        setChatLog((prevChatLog) => [...prevChatLog, { sender: 'user', message }, ...chatEntries, ]);
      }

    } catch (error) {
      console.error('Error sending message:', error);
    }
  };

  if (!isLoggedIn) {
    return (
      <div className="login-container">
        <h2>Welcome to the MuBot</h2>
        <form onSubmit={handleLogin} className="login-form">
          <input
            type="text"
            placeholder="Enter your name"
            value={username}
            onChange={(e) => setUsername(e.target.value)}
            required
          />
          <input
            type="password"
            placeholder="Enter your password"
            value={password}
            onChange={(e) => setPassword(e.target.value)}
            required
          />
          <button type="submit">Login</button>
        </form>
      </div>
    );
  }

  return (
    <div className="chat-container">
      <div className="chat-header">
        <h2>Hello, {username}</h2>
        <h3>Chat with MuBot</h3>
      </div>
      <div className="chat-log">
        {chatLog.map((chatEntry, index) => (
          <div
            key={index}
            className={`chat-entry ${chatEntry.sender === 'user' ? 'user' : 'bot'}`}
          >
            <span className="user-name">{chatEntry.sender === 'user' ? username : 'MuBot'}</span>
            <p className="chat-message">{chatEntry.message}</p>
          </div>
        ))}
      </div>
      <div className="chat-input">
        <input
          type="text"
          placeholder="Type your message..."
          value={message}
          onChange={(e) => setMessage(e.target.value)}
          onKeyPress={(e) => {
            if (e.key === 'Enter') {
              sendMessage();
            }
          }}
        />
        <button onClick={sendMessage}>Send</button>
      </div>
    </div>
  );
};

export default App;

Please find some of the important logic –

Function – handleLogin():

The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.

Function – sendMessage():

The sendMessage asynchronous function is designed to handle the user’s chat interaction:

  1. If the message is empty (after trimming spaces), the function exits without further action.
  2. A chat entry object is created with the sender set as ‘user’ and the trimmed message.
  3. The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
  4. If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
  5. If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
  6. After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
  7. Errors, if encountered, are logged to the console.

This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.


Let us explore the directory structure starting from the parent to some of the important child folder should look like this –


So, finally, we’ve done it.

I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.

You will get the complete codebase in the following GitHub link.

I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Realtime reading from a Streaming using Computer Vision

This week we’re going to extend one of our earlier posts & trying to read an entire text from streaming using computer vision. If you want to view the previous post, please click the following link.

But, before we proceed, why don’t we view the demo first?

Demo

Architecture:

Let us understand the architecture flow –

Architecture flow

The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & extracts the complete text within the video & displays it on top of the target screen besides prints the same in the console.

Python Packages:

pip install imutils==0.5.4
pip install matplotlib==3.5.2
pip install numpy==1.21.6
pip install opencv-contrib-python==4.6.0.66
pip install opencv-contrib-python-headless==4.6.0.66
pip install opencv-python==4.6.0.66
pip install opencv-python-headless==4.6.0.66
pip install pandas==1.3.5
pip install Pillow==9.1.1
pip install pytesseract==0.3.9
pip install python-dateutil==2.8.2

CODE:

Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.

  • clsReadingTextFromStream.py (This is the main class of python script that will extract the text from the WebCAM streaming in real-time.)


##################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main class of ####
#### python script that will invoke the ####
#### extraction of texts from a WebCAM. ####
#### ####
##################################################
# Importing necessary packages
from clsConfig import clsConfig as cf
from imutils.object_detection import non_max_suppression
import numpy as np
import pytesseract
import imutils
import time
import cv2
import time
###############################################
### Global Section ###
###############################################
# Two output layer names for the text detector model
lNames = cf.conf['LAYER_DET']
# Tesseract OCR text param values
strVal = "-l " + str(cf.conf['LANG']) + " –oem " + str(cf.conf['OEM_VAL']) + " –psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)
###############################################
### End of Global Section ###
###############################################
class clsReadingTextFromStream:
def __init__(self):
self.sep = str(cf.conf['SEP'])
self.Curr_Path = str(cf.conf['INIT_PATH'])
self.CacheL = int(cf.conf['CACHE_LIM'])
self.modelPath = str(cf.conf['MODEL_PATH']) + str(cf.conf['MODEL_FILE_NAME'])
self.minConf = float(cf.conf['MIN_CONFIDENCE'])
self.wt = int(cf.conf['WIDTH'])
self.ht = int(cf.conf['HEIGHT'])
self.pad = float(cf.conf['PADDING'])
self.title = str(cf.conf['TITLE'])
self.Otitle = str(cf.conf['ORIG_TITLE'])
self.drawTag = cf.conf['DRAW_TAG']
self.aRange = int(cf.conf['ASCII_RANGE'])
self.sParam = cf.conf['SUBTRACT_PARAM']
def findBoundBox(self, boxes, res, rW, rH, orig, origW, origH, pad):
try:
# Loop over the bounding boxes
for (spX, spY, epX, epY) in boxes:
# Scale the bounding box coordinates based on the respective
# ratios
spX = int(spX * rW)
spY = int(spY * rH)
epX = int(epX * rW)
epY = int(epY * rH)
# To obtain a better OCR of the text we can potentially
# apply a bit of padding surrounding the bounding box.
# And, computing the deltas in both the x and y directions
dX = int((epX – spX) * pad)
dY = int((epY – spY) * pad)
# Apply padding to each side of the bounding box, respectively
spX = max(0, spX – dX)
spY = max(0, spY – dY)
epX = min(origW, epX + (dX * 2))
epY = min(origH, epY + (dY * 2))
# Extract the actual padded ROI
roi = orig[spY:epY, spX:epX]
# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)
# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))
# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])
return res
except Exception as e:
x = str(e)
print(x)
return res
def predictText(self, imgScore, imgGeo):
try:
minConf = self.minConf
# Initializing the bounding box rectangles & confidence score by
# extracting the rows & columns from the imgScore volume.
(numRows, numCols) = imgScore.shape[2:4]
rects = []
confScore = []
for y in range(0, numRows):
# Extract the imgScore probabilities to derive potential
# bounding box coordinates that surround text
imgScoreData = imgScore[0, 0, y]
xVal0 = imgGeo[0, 0, y]
xVal1 = imgGeo[0, 1, y]
xVal2 = imgGeo[0, 2, y]
xVal3 = imgGeo[0, 3, y]
anglesData = imgGeo[0, 4, y]
for x in range(0, numCols):
# If our score does not have sufficient probability,
# ignore it
if imgScoreData[x] < minConf:
continue
# Compute the offset factor as our resulting feature
# maps will be 4x smaller than the input frame
(offX, offY) = (x * 4.0, y * 4.0)
# Extract the rotation angle for the prediction and
# then compute the sin and cosine
angle = anglesData[x]
cos = np.cos(angle)
sin = np.sin(angle)
# Derive the width and height of the bounding box from
# imgGeo
h = xVal0[x] + xVal2[x]
w = xVal1[x] + xVal3[x]
# Compute both the starting and ending (x, y)-coordinates
# for the text prediction bounding box
epX = int(offX + (cos * xVal1[x]) + (sin * xVal2[x]))
epY = int(offY – (sin * xVal1[x]) + (cos * xVal2[x]))
spX = int(epX – w)
spY = int(epY – h)
# Adding bounding box coordinates and probability score
# to the respective lists
rects.append((spX, spY, epX, epY))
confScore.append(imgScoreData[x])
# return a tuple of the bounding boxes and associated confScore
return (rects, confScore)
except Exception as e:
x = str(e)
print(x)
rects = []
confScore = []
return (rects, confScore)
def processStream(self, debugInd, var):
try:
sep = self.sep
Curr_Path = self.Curr_Path
CacheL = self.CacheL
modelPath = self.modelPath
minConf = self.minConf
wt = self.wt
ht = self.ht
pad = self.pad
title = self.title
Otitle = self.Otitle
drawTag = self.drawTag
aRange = self.aRange
sParam = self.sParam
val = 0
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] Starting video stream…")
cap = cv2.VideoCapture(0)
# Loading the pre-trained text detector
print("[INFO] Loading Text Detector…")
net = cv2.dnn.readNet(modelPath)
# Loop over the frames from the video stream
while True:
try:
# Grab the frame from our video stream and resize it
success, frame = cap.read()
orig = frame.copy()
(origH, origW) = frame.shape[:2]
# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)
# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]
# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)
# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)
# Initialize the list of results
res = []
# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)
for ((spX, spY, epX, epY), text) in res:
# Display the text OCR by using Tesseract APIs
print("Reading Text::")
print("=" *60)
print(text)
print("=" *60)
# Removing the non-ASCII text so it can draw the text on the frame
# using OpenCV, then draw the text and a bounding box surrounding
# the text region of the input frame
text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
output = orig.copy()
cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
cv2.putText(output, text, (spX, spY – 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)
# Show the output frame
cv2.imshow(title, output)
#cv2.imshow(Otitle, frame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(1) == ord('q'):
break
val = 0
except Exception as e:
x = str(e)
print(x)
val = 1
# Performing cleanup at the end
cap.release()
cv2.destroyAllWindows()
return val
except Exception as e:
x = str(e)
print('Error:', x)
return 1

Please find the key snippet from the above script –

# Two output layer names for the text detector model

lNames = cf.conf['LAYER_DET']

# Tesseract OCR text param values

strVal = "-l " + str(cf.conf['LANG']) + " --oem " + str(cf.conf['OEM_VAL']) + " --psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)

The first line contains the two output layers’ names for the text detector model. Among them, the first one indicates the outcome possibilities & the second one use to derive the bounding box coordinates of the predicted text.

The second line contains various options for the tesseract APIs. You need to understand the opportunities in detail to make them work. These are the essential options for our use case –

  • Language – The intended language, for example, English, Spanish, Hindi, Bengali, etc.
  • OEM flag – In this case, the application will use 4 to indicate LSTM neural net model for OCR.
  • OEM Value – In this case, the selected value is 7, indicating that the application treats the ROI as a single line of text.

For more details, please refer to the config file.

print("[INFO] Loading Text Detector...")
net = cv2.dnn.readNet(modelPath)

The above lines bring the already created model & load it to memory for evaluation.

# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)

# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]

# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)

# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)

The above lines are more of preparing individual frames to get the bounding box by resizing the height & width followed by a forward pass of the model to obtain two output layer sets. And then apply the non-maxima suppression to remove the weak, overlapping bounding box by interpreting the prediction. In short, this will identify the potential text region & put the bounding box surrounding it.

# Initialize the list of results
res = []

# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)

The above function will create the bounding box surrounding the predicted text regions. Also, we will capture the expected text inside the result variable.

for (spX, spY, epX, epY) in boxes:
  # Scale the bounding box coordinates based on the respective
  # ratios
  spX = int(spX * rW)
  spY = int(spY * rH)
  epX = int(epX * rW)
  epY = int(epY * rH)

  # To obtain a better OCR of the text we can potentially
  # apply a bit of padding surrounding the bounding box.
  # And, computing the deltas in both the x and y directions
  dX = int((epX - spX) * pad)
  dY = int((epY - spY) * pad)

  # Apply padding to each side of the bounding box, respectively
  spX = max(0, spX - dX)
  spY = max(0, spY - dY)
  epX = min(origW, epX + (dX * 2))
  epY = min(origH, epY + (dY * 2))

  # Extract the actual padded ROI
  roi = orig[spY:epY, spX:epX]

Now, the application will scale the bounding boxes based on the previously computed ratio for actual text recognition. In this process, the application also padded the bounding boxes & then extracted the padded region of interest.

# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)

# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))

Using OCR options, the application extracts the text within the video frame & adds that to the res list.

# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])

It then sends a sorted output to the primary calling functions.

for ((spX, spY, epX, epY), text) in res:
  # Display the text OCR by using Tesseract APIs
  print("Reading Text::")
  print("=" *60)
  print(text)
  print("=" *60)

  # Removing the non-ASCII text so it can draw the text on the frame
  # using OpenCV, then draw the text and a bounding box surrounding
  # the text region of the input frame
  text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
  output = orig.copy()

  cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
  cv2.putText(output, text, (spX, spY - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)

  # Show the output frame
  cv2.imshow(title, output)

Finally, it fetches the potential text region along with the text & then prints on top of the source video. Also, it removed some non-printable characters during this time to avoid any cryptic texts.

  • readingVideo.py (Main calling script.)


#####################################################
#### Written By: SATYAKI DE ####
#### Written On: 22-Jul-2022 ####
#### Modified On 25-Jul-2022 ####
#### ####
#### Objective: This is the main calling ####
#### python script that will invoke the ####
#### clsReadingTextFromStream class to initiate ####
#### the reading capability in real-time ####
#### & display text via Web-CAM. ####
#####################################################
# We keep the setup code in a different class as shown below.
import clsReadingTextFromStream as rtfs
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the main class
x1 = rtfs.clsReadingTextFromStream()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'readingTextFromVideo.log', level=logging.INFO)
print('Started reading text from videos!')
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully read text from the Live Stream!')
else:
print('Failed to read text from the Live Stream!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

readingVideo.py

hosted with ❤ by GitHub

Please find the key snippet –

# Instantiating all the main class

x1 = rtfs.clsReadingTextFromStream()

# Execute all the pass
r1 = x1.processStream(debugInd, var)

if (r1 == 0):
    print('Successfully read text from the Live Stream!')
else:
    print('Failed to read text from the Live Stream!')

The above lines instantiate the main calling class & then invoke the function to get the desired extracted text from the live streaming video if that is successful.

FOLDER STRUCTURE:

Here is the folder structure that contains all the files & directories in MAC O/S –

You will get the complete codebase in the following Github link.

Unfortunately, I cannot upload the model due to it’s size. I will share on the need basis.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 🙂

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Neural prophet – The enhanced version of Facebook’s forecasting API

Hi Team,

Today, I’ll be explaining the enhancement of one of the previous posts. I know that I’ve shared the fascinating API named prophet-API, which Facebook developed. One can quickly get more accurate predictions with significantly fewer data points. (If you want to know more about that post, please click on the following link.)

However, there is another enhancement on top of that API, which is more accurate. However, one needs to know – when they should consider using it. So, today, we’ll be talking about the neural prophet API.

But, before we start digging deep, why don’t we view the demo first?

Demo

Let’s visit a diagram. That way, you can understand where you can use it. Also, I’ll be sharing some of the links from the original site for better information mining.

Source: Neural Prophet (Official Site)

As one can see, this API is trying to bridge between the different groups & it enables the time-series computation efficiently.

WHERE TO USE:

Let’s visit another diagram from the same source.

Source: Neural Prophet (Official Site)

So, I hope these two pictures give you a clear picture & relatively set your expectations to more ground reality.


ARCHITECTURE:

Let us explore the architecture –

Architecture Diagram

As one can see, the application is processing IoT data & creating a historical data volume, out of which the model is gradually predicting correct outcomes with higher confidence.

For more information on this API, please visit the following link.


CODE:

Let’s explore the essential scripts here.

  1. clsConfig.py (Configuration file for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 28-Dec-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
import pandas as p
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'Data' + sep + 'thermostatIoT.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'APP_DESC_1': 'Old Video Enhancement!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR': 'data',
'SEP': sep,
'testRatio':0.2,
'valRatio':0.2,
'epochsVal':8,
'sleepTime':3,
'sleepTime1':6,
'factorVal':0.2,
'learningRateVal':0.001,
'event1': {
'event': 'SummerEnd',
'ds': p.to_datetime([
'2010-04-01', '2011-04-01', '2012-04-01',
'2013-04-01', '2014-04-01', '2015-04-01',
'2016-04-01', '2017-04-01', '2018-04-01',
'2019-04-01', '2020-04-01', '2021-04-01',
]),},
'event2': {
'event': 'LongWeekend',
'ds': p.to_datetime([
'2010-12-01', '2011-12-01', '2012-12-01',
'2013-12-01', '2014-12-01', '2015-12-01',
'2016-12-01', '2017-12-01', '2018-12-01',
'2019-12-01', '2020-12-01', '2021-12-01',
]),}
}

view raw

clsConfig.py

hosted with ❤ by GitHub

The only key snippet would be passing a nested json element with pandas dataframe in the following lines –

'event1': {
    'event': 'SummerEnd',
    'ds': p.to_datetime([
        '2010-04-01', '2011-04-01', '2012-04-01',
        '2013-04-01', '2014-04-01', '2015-04-01',
        '2016-04-01', '2017-04-01', '2018-04-01',
        '2019-04-01', '2020-04-01', '2021-04-01',
    ]),},
'event2': {
    'event': 'LongWeekend',
    'ds': p.to_datetime([
        '2010-12-01', '2011-12-01', '2012-12-01',
        '2013-12-01', '2014-12-01', '2015-12-01',
        '2016-12-01', '2017-12-01', '2018-12-01',
        '2019-12-01', '2020-12-01', '2021-12-01',
    ]),}

As one can see, our application is equipped with the events to predict our use case better.

2. clsPredictIonIoT.py (Main class file, which will invoke neural-prophet forecast for the entire application.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 19-Feb-2022 ####
#### Modified On 21-Feb-2022 ####
#### ####
#### Objective: This python script will ####
#### perform the neural-prophet forecast ####
#### based on the historical input received ####
#### from IoT device. ####
################################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import psutil
import os
import pandas as p
import json
import datetime
from neuralprophet import NeuralProphet, set_log_level
from neuralprophet import set_random_seed
from neuralprophet.benchmark import Dataset, NeuralProphetModel, SimpleExperiment, CrossValidationExperiment
import time
import clsL as cl
import matplotlib.pyplot as plt
###############################################
### Global Section ###
###############################################
# Initiating Log class
l = cl.clsL()
set_random_seed(10)
set_log_level("ERROR", "INFO")
###############################################
### End of Global Section ###
###############################################
class clsPredictIonIoT:
def __init__(self):
self.sleepTime = int(cf.conf['sleepTime'])
self.event1 = cf.conf['event1']
self.event2 = cf.conf['event2']
def forecastSeries(self, inputDf):
try:
sleepTime = self.sleepTime
event1 = self.event1
event2 = self.event2
df = inputDf
print('IoTData: ')
print(df)
## user specified events
# history events
SummerEnd = p.DataFrame(event1)
LongWeekend = p.DataFrame(event2)
dfEvents = p.concat((SummerEnd, LongWeekend))
# NeuralProphet Object
# Adding events
m = NeuralProphet(loss_func="MSE")
# set the model to expect these events
m = m.add_events(["SummerEnd", "LongWeekend"])
# create the data df with events
historyDf = m.create_df_with_events(df, dfEvents)
# fit the model
metrics = m.fit(historyDf, freq="D")
# forecast with events known ahead
futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
forecastDf = m.predict(df=futureDf)
events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
events.tail()
## plotting forecasts
fig = m.plot(forecastDf)
## plotting components
figComp = m.plot_components(forecastDf)
## plotting parameters
figParam = m.plot_parameters()
#################################
#### Train & Test Evaluation ####
#################################
m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
metricsTrain = m.fit(df=dfTrain, freq="MS")
metricsTest = m.test(df=dfTest)
print('metricsTest:: ')
print(metricsTest)
# Predict Into Future
metricsTrain2 = m.fit(df=df, freq="MS")
futureDf = m.make_future_dataframe(df, periods=24, n_historic_predictions=48)
forecastDf = m.predict(futureDf)
fig = m.plot(forecastDf)
# Visualize training
m = NeuralProphet(seasonality_mode="multiplicative", learning_rate=0.1)
dfTrain, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
metrics = m.fit(df=dfTrain, freq="MS", validation_df=dfTest, plot_live_loss=True)
print('Tail of Metrics: ')
print(metrics.tail(1))
######################################
#### Time-series Cross-Validation ####
######################################
METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
folds = NeuralProphet(**params).crossvalidation_split_df(df, freq="MS", k=5, fold_pct=0.20, fold_overlap_pct=0.5)
metricsTrain = p.DataFrame(columns=METRICS)
metricsTest = p.DataFrame(columns=METRICS)
for dfTrain, dfTest in folds:
m = NeuralProphet(**params)
train = m.fit(df=dfTrain, freq="MS")
test = m.test(df=dfTest)
metricsTrain = metricsTrain.append(train[METRICS].iloc[-1])
metricsTest = metricsTest.append(test[METRICS].iloc[-1])
print('Stats: ')
dfStats = metricsTest.describe().loc[["mean", "std", "min", "max"]]
print(dfStats)
####################################
#### Using Benchmark Framework ####
####################################
print('Starting extracting result set for Benchmark:')
ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
params = {"seasonality_mode": "multiplicative"}
exp = SimpleExperiment(
model_class=NeuralProphetModel,
params=params,
data=ts,
metrics=["MASE", "RMSE"],
test_percentage=25,
)
resultTrain, resultTest = exp.run()
print('Test result for Benchmark:: ')
print(resultTest)
print('Finished extracting result test for Benchmark!')
####################################
#### Cross Validate Experiment ####
####################################
print('Starting extracting result set for Corss-Validation:')
ts = Dataset(df = df, name = "thermoStatsCPUUsage", freq = "MS")
params = {"seasonality_mode": "multiplicative"}
exp_cv = CrossValidationExperiment(
model_class=NeuralProphetModel,
params=params,
data=ts,
metrics=["MASE", "RMSE"],
test_percentage=10,
num_folds=3,
fold_overlap_pct=0,
)
resultTrain, resultTest = exp_cv.run()
print('resultTest for Cross Validation:: ')
print(resultTest)
print('Finished extracting result test for Corss-Validation!')
######################################################
#### 3-Phase Train, Test & Validation Experiment ####
######################################################
print('Starting 3-phase Train, Test & Validation Experiment!')
m = NeuralProphet(seasonality_mode= "multiplicative", learning_rate = 0.1)
# create a test holdout set:
dfTrainVal, dfTest = m.split_df(df=df, freq="MS", valid_p=0.2)
# create a validation holdout set:
dfTrain, dfVal = m.split_df(df=dfTrainVal, freq="MS", valid_p=0.2)
# fit a model on training data and evaluate on validation set.
metricsTrain1 = m.fit(df=dfTrain, freq="MS")
metrics_val = m.test(df=dfVal)
# refit model on training and validation data and evaluate on test set.
metricsTrain2 = m.fit(df=dfTrainVal, freq="MS")
metricsTest = m.test(df=dfTest)
metricsTrain1["split"] = "train1"
metricsTrain2["split"] = "train2"
metrics_val["split"] = "validate"
metricsTest["split"] = "test"
metrics_stat = metricsTrain1.tail(1).append([metricsTrain2.tail(1), metrics_val, metricsTest]).drop(columns=['RegLoss'])
print('Metrics Stat:: ')
print(metrics_stat)
# Train, Cross-Validate and Cross-Test evaluation
METRICS = ['SmoothL1Loss', 'MAE', 'RMSE']
params = {"seasonality_mode": "multiplicative", "learning_rate": 0.1}
crossVal, crossTest = NeuralProphet(**params).double_crossvalidation_split_df(df, freq="MS", k=5, valid_pct=0.10, test_pct=0.10)
metricsTrain1 = p.DataFrame(columns=METRICS)
metrics_val = p.DataFrame(columns=METRICS)
for dfTrain1, dfVal in crossVal:
m = NeuralProphet(**params)
train1 = m.fit(df=dfTrain, freq="MS")
val = m.test(df=dfVal)
metricsTrain1 = metricsTrain1.append(train1[METRICS].iloc[-1])
metrics_val = metrics_val.append(val[METRICS].iloc[-1])
metricsTrain2 = p.DataFrame(columns=METRICS)
metricsTest = p.DataFrame(columns=METRICS)
for dfTrain2, dfTest in crossTest:
m = NeuralProphet(**params)
train2 = m.fit(df=dfTrain2, freq="MS")
test = m.test(df=dfTest)
metricsTrain2 = metricsTrain2.append(train2[METRICS].iloc[-1])
metricsTest = metricsTest.append(test[METRICS].iloc[-1])
mtrain2 = metricsTrain2.describe().loc[["mean", "std"]]
print('Train 2 Stats:: ')
print(mtrain2)
mval = metrics_val.describe().loc[["mean", "std"]]
print('Validation Stats:: ')
print(mval)
mtest = metricsTest.describe().loc[["mean", "std"]]
print('Test Stats:: ')
print(mtest)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Some of the key snippets that I will discuss here are as follows –

## user specified events
# history events
SummerEnd = p.DataFrame(event1)
LongWeekend = p.DataFrame(event2)

dfEvents = p.concat((SummerEnd, LongWeekend))

# NeuralProphet Object
# Adding events
m = NeuralProphet(loss_func="MSE")

# set the model to expect these events
m = m.add_events(["SummerEnd", "LongWeekend"])

# create the data df with events
historyDf = m.create_df_with_events(df, dfEvents)

Creating & adding events into your model will allow it to predict based on the milestones.

# fit the model
metrics = m.fit(historyDf, freq="D")

# forecast with events known ahead
futureDf = m.make_future_dataframe(df=historyDf, events_df=dfEvents, periods=365, n_historic_predictions=len(df))
forecastDf = m.predict(df=futureDf)

events = forecastDf[(forecastDf['event_SummerEnd'].abs() + forecastDf['event_LongWeekend'].abs()) > 0]
events.tail()

## plotting forecasts
fig = m.plot(forecastDf)

## plotting components
figComp = m.plot_components(forecastDf)

## plotting parameters
figParam = m.plot_parameters()

Based on the daily/monthly collected data, our algorithm tries to plot the data points & predict a future trend, which will look like this –

Future Data Points

From the above diagram, we can conclude that the CPU’s trend has been growing day by day since the beginning. However, there are some events when we can see a momentary drop in requirements due to the climate & holidays. During those times, either people are not using them or are not at home.

Apart from that, I’ve demonstrated the use of a benchwork framework, & splitting the data into Train, Test & Validation & captured the RMSE values. I would request you to go through that & post any questions if you have any.

You can witness the train & validation datasets & visualize them in the standard manner, which will look something like –

Demo

3. readingIoT.py (Main invoking script.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 21-Feb-2022 ####
#### Modified On 21-Feb-2022 ####
#### ####
#### Objective: This python script will ####
#### invoke the main class to use the ####
#### stored historical IoT data stored & ####
#### then transform, cleanse, predict & ####
#### analyze the data points into more ####
#### meaningful decision-making insights. ####
###############################################
# We keep the setup code in a different class as shown below.
from clsConfig import clsConfig as cf
import datetime
import logging
import pandas as p
import clsPredictIonIoT as cpt
###############################################
### Global Section ###
###############################################
sep = str(cf.conf['SEP'])
Curr_Path = str(cf.conf['INIT_PATH'])
fileName = str(cf.conf['FILE_NAME'])
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
# Initiating Prediction class
x1 = cpt.clsPredictIonIoT()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoT_NeuralProphet.log', level=logging.INFO)
# Reading the source IoT data
iotData = p.read_csv(fileName)
df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]
r1 = x1.forecastSeries(df)
if (r1 == 0):
print('Successfully IoT forecast predicted!')
else:
print('Failed to predict IoT forecast!')
var2 = datetime.datetime.now()
c = var2 – var1
minutes = c.total_seconds() / 60
print('Total Run Time in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

view raw

readingIoT.py

hosted with ❤ by GitHub

Here are some of the key snippets –

# Reading the source IoT data
iotData = p.read_csv(fileName)
df = iotData.rename(columns={'MonthlyDate': 'ds', 'AvgIoTCPUUsage': 'y'})[['ds', 'y']]

r1 = x1.forecastSeries(df)

if (r1 == 0):
    print('Successfully IoT forecast predicted!')
else:
    print('Failed to predict IoT forecast!')

var2 = datetime.datetime.now()

In those above lines, the main calling application is invoking the neural-forecasting class & passing the pandas dataframe containing IoT’s historical data to train its model.

For your information, here is the outcome of the run, when you invoke the main calling script –

Demo – Continue

FOLDER STRUCTURE:

Please find the folder structure as shown –

Directory Structure

So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging! 😀

Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.

Real-Time Matplotlib view from a streaming data built using Python & Kivy-based iOS App

Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.

So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.


But, before we dig into more details, let us see a quick demo of our iOS App.

Demo:

Demo

Isn’t it exciting? Great! Now, let’s dig into the details.


Let’s understand the architecture as to how we want to proceed with the solution here.

Architecture:

Broad-level design

The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.

So, now we understand the architecture. Fantastic!

Let’s deep dive into the code that we specifically built for this use case.


Code:

  1. IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Publishing Streaming data ####
#### to Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import random
import time
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
# Invoking the IoT Device Generator.
def main():
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
###############################################
### End of Global Section ###
###############################################
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IoTDevice.log', level=logging.INFO)
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
x_value = 0
total_1 = 100
total_2 = 100
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of usefull variables
while True:
srcJson = {
"x_value": x_value,
"total_1": total_1,
"total_2": total_2
}
x_value += 1
total_1 = total_1 + random.randint(-6, 8)
total_2 = total_2 + random.randint(-5, 6)
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IoT event pushed!')
else:
print('Failed to push IoT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = -1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
time.sleep(1)
if __name__ == "__main__":
main()

view raw

IoTDataGen.py

hosted with ❤ by GitHub

Let’s explore the key snippets from the above script.

# Initiating Ably class to push events
x1 = cps.clsPublishStream()

The I-OS App is calling the main class to publish the JSON events to Ably Queue.

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

    # Pushing both the Historical Confirmed Cases
    retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

    if retVal_1 == 0:
        print('Successfully IoT event pushed!')
    else:
        print('Failed to push IoT events!')

    srcJsonMast = ''
    tmpJson = ''
    cnt = 0
    idx = -1
    srcJson = {}
    retVal_1 = 0
else:
    srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson

In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.

2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 12-Nov-2021 ####
#### ####
#### Objective: This Kivy design file contains all the ####
#### graphical interface of our I-OS App. This including ####
#### the functionalities of buttons. ####
#### ####
#### Note: If you think this file is not proeprly read by ####
#### the program, then remove this entire comment block & ####
#### then run the application. It should work. ####
###############################################################
MainInterface:
<MainInterface>:
ScreenManager:
id: sm
size: root.width, root.height
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Screen:
name: "background_2"
Image:
source: "Background/Background_2.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
Label:
text: "Please find the realtime IoT-device Live Statistics:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':7.0}
Label:
text: "DC to Servo Min Ratio:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':3.0,'center_y':6.2}
Label:
id: dynMin
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.2,'center_y':6.2}
Label:
text: "DC Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':5.0}
Label:
id: dynDC
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.6}
Label:
text: " ——- Vs ——- "
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':4.0}
Label:
text: "Servo Motor:"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.4}
Label:
text: "(MAX)"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':3.0}
Label:
id: dynServo
text: "100"
text_size: self.width + 430, None
height: self.texture_size[1]
halign: "left"
valign: "top"
pos_hint: {'center_x':6.8,'center_y':2.6}
FloatLayout:
id: box
size: 400, 550
pos: 200, 300
Screen:
name: "background_3"
Image:
source: "Background/Background_3.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "Please find the live like status."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.6,'center_y':7.2}
Label:
id: dynVal
text: "100"
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':4.1,'center_y':6.4}
Image:
id: lk_img_1
pos_hint: {'center_x':3.2, 'center_y':6.4}
size_hint: None, None
size: 460, 285
source: "Background/Likes_Btn_R.png"
Label:
text: "Want to know more about the Developer? Here is the detail ->"
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':5.5}
Label:
text: "I love to find out new technologies that is emerging as a driving force & shape our future!"
text_size: self.width + 290, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.3,'center_y':3.8}
Label:
text: "For more information view the website to know more on Python-Kivy along with Matplotlib Live Streaming."
text_size: self.width + 450, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.1,'center_y':1.9}
Image:
id: avatar
pos_hint: {'right':6.8, 'top':5.4}
size_hint: None, None
size: 460, 285
source: "Background/Me.jpeg"
Label:
text: "https://www.satyakide.com&quot;
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':3.4,'center_y':0.9}
Image:
source: "Background/Top_Bar.png"
size: 620, 175
pos: 0, root.height – 535
Button:
#: set val 'Start'
size: 112.5, 75
pos: root.width/2-190, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val, sm)
on_release: root.released(self, val)
Image:
id: s_img
text: val
source: "Background/Start_Btn.png"
center_x: self.parent.center_x – 260
center_y: self.parent.center_y – 415
Button:
#: set val2 'Stats'
size: 112.5, 75
pos: root.width/2-55, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val2, sm)
on_release: root.released(self, val2)
Image:
id: st_img
text: val2
source: "Background/Stats_Btn.png"
center_x: self.parent.center_x – 250
center_y: self.parent.center_y – 415
Button:
#: set val3 'Likes'
size: 112.5, 75
pos: root.width/2+75, root.height-120
background_color: 1,1,1,0
on_press: root.pressed(self, val3, sm)
on_release: root.released(self, val3)
Image:
id: lk_img
text: val3
source: "Background/Likes_Btn.png"
center_x: self.parent.center_x – 240
center_y: self.parent.center_y – 415

view raw

custom.kv

hosted with ❤ by GitHub

To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.

Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.

To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –

Official Kivy-refernce

Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.

Some of the key lines from the above scripting files will be –

Screen:
  name: "background_1"
  Image:
      source: "Background/Background_1.png"
      allow_stretch: True
      keep_ratio: True
      size_hint_y: None
      size_hint_x: None
      width: self.parent.width
      height: self.parent.width/self.image_ratio
      FloatLayout:
          orientation: 'vertical'
          Label:
              text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
              text_size: self.width + 350, None
              height: self.texture_size[1]
              halign: "left"
              valign: "bottom"
              pos_hint: {'center_x':2.9,'center_y':6.5}
          Image:
              id: homesc
              pos_hint: {'right':6, 'top':5.4}
              size_hint: None, None
              size: 560, 485
              source: "Background/FP.jpeg"

Let us understand what we discussed here & try to map that with the image.

Part of GUI defined in KV file

From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.

Image:
      source: "Background/Top_Bar.png"
      size: 620, 175
      pos: 0, root.height - 535

  Button:
      #: set val 'Start'
      size: 112.5, 75
      pos: root.width/2-190, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val, sm)
      on_release: root.released(self, val)
      Image:
          id: s_img
          text: val
          source: "Background/Start_Btn.png"
          center_x: self.parent.center_x - 260
          center_y: self.parent.center_y - 415

  Button:
      #: set val2 'Stats'
      size: 112.5, 75
      pos: root.width/2-55, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val2, sm)
      on_release: root.released(self, val2)
      Image:
          id: st_img
          text: val2
          source: "Background/Stats_Btn.png"
          center_x: self.parent.center_x - 250
          center_y: self.parent.center_y - 415

  Button:
      #: set val3 'Likes'
      size: 112.5, 75
      pos: root.width/2+75, root.height-120
      background_color: 1,1,1,0
      on_press: root.pressed(self, val3, sm)
      on_release: root.released(self, val3)
      Image:
          id: lk_img
          text: val3
          source: "Background/Likes_Btn.png"
          center_x: self.parent.center_x - 240
          center_y: self.parent.center_y - 415

Let us understand the custom buttons mapped in our Apps.

So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).

However, these button methods will be present inside the main python script, which we’ll discuss after this segment.

3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
from kivy.app import App
from kivy.uix.widget import Widget
from kivy.lang import Builder
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.floatlayout import FloatLayout
from kivy.clock import Clock
from kivy.core.window import Window
from kivymd.app import MDApp
import datetime as dt
import datetime
from kivy.properties import StringProperty
from kivy.vector import Vector
import regex as re
import os
os.environ["KIVY_IMAGE"]="pil"
import platform as pl
import matplotlib.pyplot as plt
import pandas as p
from matplotlib.patches import Rectangle
from matplotlib import use as mpl_use
mpl_use('module://kivy.garden.matplotlib.backend_kivy')
plt.style.use('fivethirtyeight')
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
Window.size = (310, 460)
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
def getRealTimeIoT():
try:
# Let's pass this to our map section
df = x1.conStream(var1, DInd)
print('Data:')
print(str(df))
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
class MainInterface(FloatLayout):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.data = getRealTimeIoT()
self.likes = 0
self.dcMotor = 0
self.servoMotor = 0
self.minRatio = 0
plt.subplots_adjust(bottom=0.19)
#self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
self.fig, self.ax = plt.subplots()
self.mpl_canvas = self.fig.canvas
def on_data(self, *args):
self.ax.clear()
self.data = getRealTimeIoT()
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
self.likes = self.getMaxLike(self.data)
self.ids.dynVal.text = str(self.likes)
self.ids.lk_img_1.source = ''
self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"
self.dcMotor = self.getMaxDCMotor(self.data)
self.ids.dynDC.text = str(self.dcMotor)
self.servoMotor = self.getMaxServoMotor(self.data)
self.ids.dynServo.text = str(self.servoMotor)
self.minRatio = self.getDc2ServoMinRatio(self.data)
self.ids.dynMin.text = str(self.minRatio)
x = self.data['x_value']
y1 = self.data['total_1']
y2 = self.data['total_2']
self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)
self.mpl_canvas.draw_idle()
box = self.ids.box
box.clear_widgets()
box.add_widget(self.mpl_canvas)
return self.data
def getMaxLike(self, df):
payload = df['x_value']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMaxServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['max']))
max_val = int(re.search(r'\d+', a1)[0])
return max_val
def getMinDCMotor(self, df):
payload = df['total_1']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getMinServoMotor(self, df):
payload = df['total_2']
a1 = str(payload.agg(['min']))
min_val = int(re.search(r'\d+', a1)[0])
return min_val
def getDc2ServoMinRatio(self, df):
minDC = self.getMinDCMotor(df)
minServo = self.getMinServoMotor(df)
min_ratio = round(float(minDC/minServo), 5)
return min_ratio
def update(self, *args):
self.data = self.on_data(self.data)
def pressed(self, instance, inText, SM):
if str(inText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
if ((SM.current == "background_2") or (SM.current == "background_3")):
SM.transition.direction = "right"
SM.current= "background_1"
Clock.unschedule(self.update)
self.remove_widget(self.mpl_canvas)
elif str(inText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
if (SM.current == "background_1"):
SM.transition.direction = "left"
elif (SM.current == "background_3"):
SM.transition.direction = "right"
SM.current= "background_2"
Clock.schedule_interval(self.update, 0.1)
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Likes_Btn.png"
print('In Pressed: ', str(instance.parent.ids.lk_img.text).upper())
if ((SM.current == "background_1") or (SM.current == "background_2")):
SM.transition.direction = "left"
SM.current= "background_3"
Clock.schedule_interval(self.update, 0.1)
instance.parent.ids.dynVal.text = str(self.likes)
instance.parent.ids.dynDC.text = str(self.dcMotor)
instance.parent.ids.dynServo.text = str(self.servoMotor)
instance.parent.ids.dynMin.text = str(self.minRatio)
self.remove_widget(self.mpl_canvas)
def released(self, instance, inrText):
if str(inrText).upper() == 'START':
instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Start_Btn.png"
print('Released: ', str(instance.parent.ids.s_img.text).upper())
elif str(inrText).upper() == 'STATS':
instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Stats_Btn.png"
print('Released: ', str(instance.parent.ids.st_img.text).upper())
else:
instance.parent.ids.lk_img.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
print('Released: ', str(instance.parent.ids.lk_img.text).upper())
class CustomApp(MDApp):
def build(self):
return MainInterface()
if __name__ == "__main__":
custApp = CustomApp()
custApp.run()

view raw

main.py

hosted with ❤ by GitHub

Let us explore the main script now.

def getRealTimeIoT():
    try:
        # Let's pass this to our map section
        df = x1.conStream(var1, DInd)

        print('Data:')
        print(str(df))

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The above function will invoke the streaming class to consume the mock IoT live events as a pandas dataframe from the Ably queue.

class MainInterface(FloatLayout):

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.data = getRealTimeIoT()
        self.likes = 0
        self.dcMotor = 0
        self.servoMotor = 0
        self.minRatio = 0
        plt.subplots_adjust(bottom=0.19)

        #self.fig, self.ax = plt.subplots(1,1, figsize=(6.5,10))
        self.fig, self.ax = plt.subplots()
        self.mpl_canvas = self.fig.canvas

Application is instantiating the main class & assignments of all the critical variables, including the matplotlib class.

    def pressed(self, instance, inText, SM):

        if str(inText).upper() == 'START':
            instance.parent.ids.s_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Start_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.s_img.text).upper())
            if ((SM.current == "background_2") or (SM.current == "background_3")):
                SM.transition.direction = "right"
            SM.current= "background_1"
            Clock.unschedule(self.update)
            self.remove_widget(self.mpl_canvas)

We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.

        elif str(inText).upper() == 'STATS':

            instance.parent.ids.st_img.source = Curr_Path + sep + 'Background' + sep + "Pressed_Stats_Btn.png"
            print('In Pressed: ', str(instance.parent.ids.st_img.text).upper())
            if (SM.current == "background_1"):
                SM.transition.direction = "left"
            elif (SM.current == "background_3"):
                SM.transition.direction = "right"
            SM.current= "background_2"
            Clock.schedule_interval(self.update, 0.1)

The next screen invokes the dynamic & real-time content. So, please pay extra attention to the following line –

Clock.schedule_interval(self.update, 0.1)

This line will invoke the update function, which looks like –

    def update(self, *args):
        self.data = self.on_data(self.data)

Here is the logic for the update function, which will invoke another function named – “on_data“.

    def on_data(self, *args):
        self.ax.clear()
        self.data = getRealTimeIoT()

        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn.png"
        self.likes = self.getMaxLike(self.data)
        self.ids.dynVal.text = str(self.likes)
        self.ids.lk_img_1.source = ''
        self.ids.lk_img_1.source = Curr_Path + sep + 'Background' + sep + "Likes_Btn_R.png"

        self.dcMotor = self.getMaxDCMotor(self.data)
        self.ids.dynDC.text = str(self.dcMotor)

        self.servoMotor = self.getMaxServoMotor(self.data)
        self.ids.dynServo.text = str(self.servoMotor)

        self.minRatio = self.getDc2ServoMinRatio(self.data)
        self.ids.dynMin.text = str(self.minRatio)

        x = self.data['x_value']
        y1 = self.data['total_1']
        y2 = self.data['total_2']

        self.ax.plot(x, y1, label='Channel 1', linewidth=5.0)
        self.ax.plot(x, y2, label='Channel 2', linewidth=5.0)

        self.mpl_canvas.draw_idle()

        box = self.ids.box
        box.clear_widgets()
        box.add_widget(self.mpl_canvas)

        return self.data

The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.

Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.


Run:

Let’s run the app & see the output –

STEP – 1

Triggering the mock IoT App

STEP – 2

Triggering the iOS App

STEP – 3


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.

Projecting real-time KPIs by ingesting streaming events from emulated IoT-device

Today, I am planning to demonstrate an IoT use case implemented in Python. I was waiting for my Raspberry Pi to arrive. However, the product that I received was not working as expected. Perhaps, some hardware malfunction. Hence, I was looking for a way to continue with my installment even without the hardware.

I was looking for an alternative way to use an online Raspberry Pi emulator. Recently, Microsoft has introduced integrated Raspberry Pi, which you can directly integrate with Azure IoT. However, I couldn’t find any API, which I could leverage on my Python application.

So, I explored all the possible options & finally come-up with the idea of creating my own IoT-Emulator, which can integrate with any application. With the help from the online materials, I have customized & enhanced them as per my use case & finally come up with this clean application that will demonstrate this use case with clarity.

We’ll showcase this real-time use case, where we would try to capture the events generated by IoT in a real-time dashboard, where the values in the visual display points will be affected as soon as the source data changes.


However, I would like to share the run before we dig deep into this.

Demo

Isn’t this exciting? How we can use our custom-built IoT emulator & captures real-time events to Ably Queue, then transform those raw events into more meaningful KPIs. Let’s deep dive then.


Architecture:

Let’s explore the architecture –

Fig – 1

As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, Dashboard consumes the events & transforms them into more meaningful metrics.


Package Installation:

Let us understand the sample packages that require for this task.

Step – 1:

Installation

Step – 2:

Installation – Continue

And, here is the command to install those packages –

pip install dash==1.0.0
pip install numpy==1.16.4
pip install pandas==0.24.2
pip install scipy==1.3.0
pip install gunicorn==19.9.0
pip install ably==1.1.1
pip install tkgpio==0.1

Code:

Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that require for this use case.

1. clsConfig.py (This native Python script contains the configuration entries.)


################################################
#### Written By: SATYAKI DE ####
#### Written On: 15-May-2020 ####
#### Modified On: 25-Sep-2021 ####
#### ####
#### Objective: This script is a config ####
#### file, contains all the keys for ####
#### Machine-Learning & streaming dashboard.####
#### ####
################################################
import os
import platform as pl
class clsConfig(object):
Curr_Path = os.path.dirname(os.path.realpath(__file__))
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
conf = {
'APP_ID': 1,
'ARCH_DIR': Curr_Path + sep + 'arch' + sep,
'PROFILE_PATH': Curr_Path + sep + 'profile' + sep,
'LOG_PATH': Curr_Path + sep + 'log' + sep,
'REPORT_PATH': Curr_Path + sep + 'report',
'FILE_NAME': Curr_Path + sep + 'data' + sep + 'TradeIn.csv',
'SRC_PATH': Curr_Path + sep + 'data' + sep,
'JSONFileNameWithPath': Curr_Path + sep + 'GUI_Config' + sep + 'CircuitConfiguration.json',
'APP_DESC_1': 'Dash Integration with Ably!',
'DEBUG_IND': 'N',
'INIT_PATH': Curr_Path,
'SUBDIR' : 'data',
'ABLY_ID': 'WWP309489.93jfkT:32kkdhdJjdued79e',
"URL":"https://corona-api.com/countries/&quot;,
"appType":"application/json",
"conType":"keep-alive",
"limRec": 50,
"CACHE":"no-cache",
"MAX_RETRY": 3,
"coList": "DE, IN, US, CA, GB, ID, BR",
"FNC": "NewConfirmed",
"TMS": "ReportedDate",
"FND": "NewDeaths",
"FinData": "Cache.csv"
}

view raw

clsConfig.py

hosted with ❤ by GitHub

A few of the new entries, which are essential to this task are -> ABLY_ID, FinData & JSONFileNameWithPath.

2. clsPublishStream.py (This script will publish real-time streaming data coming out from a hosted API sources using another popular third-party service named Ably. Ably mimics pubsub Streaming concept, which might be extremely useful for any start-ups.)


###############################################################
#### ####
#### Written By: Satyaki De ####
#### Written Date: 26-Jul-2021 ####
#### Modified Date: 08-Sep-2021 ####
#### ####
#### Objective: This script will publish real-time ####
#### streaming data coming out from a hosted API ####
#### sources using another popular third-party service ####
#### named Ably. Ably mimics pubsub Streaming concept, ####
#### which might be extremely useful for any start-ups. ####
#### ####
###############################################################
from ably import AblyRest
import logging
import json
from random import seed
from random import random
import json
import math
import random
from clsConfig import clsConfig as cf
seed(1)
# Global Section
logger = logging.getLogger('ably')
logger.addHandler(logging.StreamHandler())
ably_id = str(cf.conf['ABLY_ID'])
ably = AblyRest(ably_id)
channel = ably.channels.get('sd_channel')
# End Of Global Section
class clsPublishStream:
def __init__(self):
self.msgSize = cf.conf['limRec']
def pushEvents(self, srcJSON, debugInd, varVa):
try:
msgSize = self.msgSize
# Capturing the inbound dataframe
jdata_fin = json.dumps(srcJSON)
print('IOT Events: ')
print(str(jdata_fin))
# Publish rest of the messages to the sd_channel channel
channel.publish('event', jdata_fin)
jdata_fin = ''
return 0
except Exception as e:
x = str(e)
print(x)
logging.info(x)
return 1

We’re not going to discuss this as we’ve already discussed in my previous post.

3. clsStreamConsume.py (Consuming Streaming data from Ably channels.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 26-Jul-2021 ####
#### Modified On 08-Sep-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels published by the ####
#### playIOTDevice.py ####
#### ####
##############################################
import json
from clsConfig import clsConfig as cf
import requests
import logging
import time
import pandas as p
import clsL as cl
from ably import AblyRest
# Initiating Log class
l = cl.clsL()
class clsStreamConsume:
def __init__(self):
self.ably_id = str(cf.conf['ABLY_ID'])
self.fileName = str(cf.conf['FinData'])
def conStream(self, varVa, debugInd):
try:
ably_id = self.ably_id
fileName = self.fileName
var = varVa
debug_ind = debugInd
# Fetching the data
client = AblyRest(ably_id)
channel = client.channels.get('sd_channel')
message_page = channel.history()
# Counter Value
cnt = 0
# Declaring Global Data-Frame
df_conv = p.DataFrame()
for i in message_page.items:
print('Last Msg: {}'.format(i.data))
json_data = json.loads(i.data)
#jdata = json.dumps(json_data)
# Converting String to Dictionary
dict_json = eval(json_data)
# Converting JSON to Dataframe
#df = p.json_normalize(json_data)
#df.columns = df.columns.map(lambda x: x.split(".")[-1])
df = p.DataFrame.from_dict(dict_json, orient='index')
#print('DF Inside:')
#print(df)
if cnt == 0:
df_conv = df
else:
d_frames = [df_conv, df]
df_conv = p.concat(d_frames)
cnt += 1
# Resetting the Index Value
df_conv.reset_index(drop=True, inplace=True)
# This will check whether the current load is happening
# or not. Based on that, it will capture the old events
# from cache.
if df_conv.empty:
df_conv = p.read_csv(fileName, index = True)
else:
l.logr(fileName, debug_ind, df_conv, 'log')
return df_conv
except Exception as e:
x = str(e)
print('Error: ', x)
logging.info(x)
# This will handle the error scenaio as well.
# Based on that, it will capture the old events
# from cache.
try:
df_conv = p.read_csv(fileName, index = True)
except:
df = p.DataFrame()
return df

We’re not going to discuss this as we’ve already discussed in my previous post.

4. CircuitConfiguration.json (Configuration file for GUI Interface for IoT Simulator.)


{
"name":"Analog Device",
"width":700,
"height":350,
"leds":[
{
"x":105,
"y":80,
"name":"LED",
"pin":21
}
],
"motors":[
{
"x":316,
"y":80,
"name":"DC Motor",
"forward_pin":22,
"backward_pin":23
}
],
"servos":[
{
"x":537,
"y":80,
"name":"Servo Motor",
"pin":24,
"min_angle":-180,
"max_angle":180,
"initial_angle":20
}
],
"adc":{
"mcp_chip":3008,
"potenciometers":[
{
"x":40,
"y":200,
"name":"Brightness Potentiometer",
"channel":0
},
{
"x":270,
"y":200,
"name":"Speed Potentiometer",
"channel":2
},
{
"x":500,
"y":200,
"name":"Angle Potentiometer",
"channel":6
}
]
},
"toggles":[
{
"x":270,
"y":270,
"name":"Direction Toggle Switch",
"pin":15,
"off_label":"backward",
"on_label":"forward",
"is_on":false
}
],
"labels":[
{
"x":15,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":56,
"y":26,
"text":"Brightness Control"
},
{
"x":245,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":298,
"y":26,
"text":"Speed Control"
},
{
"x":475,
"y":35,
"width":25,
"height":18,
"borderwidth":2,
"relief":"solid"
},
{
"x":531,
"y":26,
"text":"Angle Control"
}
]
}

This json configuration will be used by the next python class.

5. clsBuildCircuit.py (Calling Tk Circuit API.)


##############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Calling Tk Circuit API ####
##############################################
from tkgpio import TkCircuit
from json import load
from clsConfig import clsConfig as cf
fileName = str(cf.conf['JSONFileNameWithPath'])
print('File Name: ', str(fileName))
# initialize the circuit inside the GUI
with open(fileName, "r") as file:
config = load(file)
class clsBuildCircuit:
def __init__(self):
self.config = config
def genCir(self, main_function):
try:
config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)
return circuit
except Exception as e:
x = str(e)
print(x)
return ''

Key snippets from the above script –

config = self.config
circuit = TkCircuit(config)
circuit.run(main_function)

The above lines will create an instance of simulated IoT circuits & then it will use the json file to start the GUI class.

6. playIOTDevice.py (Main Circuit GUI script to create an IoT Device to generate the events, which will consumed.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 25-Sep-2021 ####
#### Modified On 25-Sep-2021 ####
#### ####
#### Objective: Main Tk Circuit GUI script ####
#### to create an IOT Device to generate ####
#### the events, which will consumed. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsBuildCircuit as csb
import json
import clsPublishStream as cps
import datetime
from clsConfig import clsConfig as cf
import logging
###############################################
### Global Section ###
###############################################
# Initiating Ably class to push events
x1 = cps.clsPublishStream()
# Create the instance of the Tk Circuit API Class.
circuit = csb.clsBuildCircuit()
###############################################
### End of Global Section ###
###############################################
# Invoking the IOT Device Generator.
@circuit.genCir
def main():
from gpiozero import PWMLED, Motor, Servo, MCP3008, Button
from time import sleep
# Circuit Components
ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)
ioMeter1 = MCP3008(0)
ioMeter2 = MCP3008(2)
ioMeter3 = MCP3008(6)
switch = Button(15)
# End of circuit components
# Other useful variables
cnt = 1
idx = 0
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
msgSize = int(cf.conf['limRec'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'IOTDevice.log', level=logging.INFO)
while True:
ledAlert.value = ioMeter1.value
if switch.is_pressed:
dcMotor.forward(ioMeter2.value)
xVal = 'Motor Forward'
else:
dcMotor.backward(ioMeter2.value)
xVal = 'Motor Backward'
servoMotor.value = 1 – 2 * ioMeter3.value
srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}
tmpJson = str(srcJson)
if cnt == 1:
srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
srcJsonMast = srcJsonMast + '}'
print('JSON: ')
print(str(srcJsonMast))
# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)
if retVal_1 == 0:
print('Successfully IOT event pushed!')
else:
print('Failed to push IOT events!')
srcJsonMast = ''
tmpJson = ''
cnt = 0
idx = -1
srcJson = {}
retVal_1 = 0
else:
srcJsonMast = srcJsonMast + ',' + '"' + str(idx) + '":'+ tmpJson
cnt += 1
idx += 1
sleep(0.05)

Lets’ explore the key snippets –

ledAlert = PWMLED(21)
dcMotor = Motor(22, 23)
servoMotor = Servo(24)

It defines three motors that include Servo, DC & LED.

Now, we can see the following sets of the critical snippet –

ledAlert.value = ioMeter1.value

if switch.is_pressed:
    dcMotor.forward(ioMeter2.value)
    xVal = 'Motor Forward'
else:
    dcMotor.backward(ioMeter2.value)
    xVal = 'Motor Backward'

servoMotor.value = 1 - 2 * ioMeter3.value

srcJson = {
"LedMeter": ledAlert.value,
"DCMeter": ioMeter2.value,
"ServoMeter": ioMeter3.value,
"SwitchStatus": switch.is_pressed,
"DCMotorPos": xVal,
"ServoMotor": servoMotor.value
}

Following lines will dynamically generates JSON that will be passed into the Ably queue –

tmpJson = str(srcJson)

if cnt == 1:
    srcJsonMast = '{' + '"' + str(idx) + '":'+ tmpJson
elif cnt == msgSize:
    srcJsonMast = srcJsonMast + '}'
    print('JSON: ')
    print(str(srcJsonMast))

Final line from the above script –

# Pushing both the Historical Confirmed Cases
retVal_1 = x1.pushEvents(srcJsonMast, debugInd, var)

This code will now push the events into the Ably Queue.

7. app.py (Consuming Streaming data from Ably channels & captured IOT events from the simulator & publish them in Dashboard through measured KPIs.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 02-Oct-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IOT ####
#### events from the simulator & publish ####
#### them in Dashboard through measured ####
#### KPIs. ####
#### ####
##############################################
import os
import pathlib
import numpy as np
import datetime as dt
import dash
from dash import dcc
from dash import html
import datetime
import dash_daq as daq
from dash.exceptions import PreventUpdate
from dash.dependencies import Input, Output, State
from scipy.stats import rayleigh
# Consuming data from Ably Queue
from ably import AblyRest
# Main Class to consume streaming
import clsStreamConsume as ca
# Create the instance of the Covid API Class
x1 = ca.clsStreamConsume()
var1 = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
print('*' *60)
DInd = 'Y'
GRAPH_INTERVAL = os.environ.get("GRAPH_INTERVAL", 5000)
app = dash.Dash(
__name__,
meta_tags=[{"name": "viewport", "content": "width=device-width, initial-scale=1"}],
)
app.title = "IOT Device Dashboard"
server = app.server
app_color = {"graph_bg": "#082255", "graph_line": "#007ACE"}
app.layout = html.Div(
[
# header
html.Div(
[
html.Div(
[
html.H4("IOT DEVICE STREAMING", className="app__header__title"),
html.P(
"This app continually consumes streaming data from IOT-Device and displays live charts of various metrics & KPI associated with it.",
className="app__header__title–grey",
),
],
className="app__header__desc",
),
html.Div(
[
html.A(
html.Button("SOURCE CODE", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream&quot;,
),
html.A(
html.Button("VIEW DEMO", className="link-button"),
href="https://github.com/SatyakiDe2019/IOTStream/blob/main/demo.gif&quot;,
),
html.A(
html.Img(
src=app.get_asset_url("dash-new-logo.png"),
className="app__menu__img",
),
href="https://plotly.com/dash/&quot;,
),
],
className="app__header__logo",
),
],
className="app__header",
),
html.Div(
[
# Motor Speed
html.Div(
[
html.Div(
[html.H6("SERVO METER (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
),
# Second Panel
html.Div(
[html.H6("DC-MOTOR (IOT)", className="graph__title")]
),
dcc.Graph(
id="iot-measure-1",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
dcc.Interval(
id="iot-measure-update-1",
interval=int(GRAPH_INTERVAL),
n_intervals=0,
)
],
className="two-thirds column motor__speed__container",
),
html.Div(
[
# histogram
html.Div(
[
html.Div(
[
html.H6(
"MOTOR POWER HISTOGRAM",
className="graph__title",
)
]
),
html.Div(
[
dcc.Slider(
id="bin-slider",
min=1,
max=60,
step=1,
value=20,
updatemode="drag",
marks={
20: {"label": "20"},
40: {"label": "40"},
60: {"label": "60"},
},
)
],
className="slider",
),
html.Div(
[
dcc.Checklist(
id="bin-auto",
options=[
{"label": "Auto", "value": "Auto"}
],
value=["Auto"],
inputClassName="auto__checkbox",
labelClassName="auto__label",
),
html.P(
"# of Bins: Auto",
id="bin-size",
className="auto__p",
),
],
className="auto__container",
),
dcc.Graph(
id="motor-histogram",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container first",
),
# motor direction
html.Div(
[
html.Div(
[
html.H6(
"SERVO MOTOR DIRECTION", className="graph__title"
)
]
),
dcc.Graph(
id="servo-motor-direction",
figure=dict(
layout=dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
)
),
),
],
className="graph__container second",
),
],
className="one-third column histogram__direction",
),
],
className="app__content",
),
],
className="app__container",
)
def toPositive(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMotor']))
elif flag == 'DCMotor':
x_val = abs(float(row['DCMotor'])) * 0.001
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def toPositiveInflated(row, flag):
try:
if flag == 'ServoMeter':
x_val = abs(float(row['ServoMeter'])) * 100
elif flag == 'DCMotor':
x_val = abs(float(row['DCMeter'])) * 100
return x_val
except Exception as e:
x = str(e)
print(x)
val = 0
return val
def getData(var, Ind):
try:
# Let's pass this to our map section
df = x1.conStream(var, Ind)
df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)
# Dropping old columns
df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)
#Rename New Columns to Old Columns
df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)
return df
except Exception as e:
x = str(e)
print(x)
df = p.DataFrame()
return df
@app.callback(
Output("iot-measure-1", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the DC Meter graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["DCMotor"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["DCMeter"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["DCMotor"])),
max(100, max(df["DCMotor"]) + max(df["DCMeter"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["DCMotor"].iloc[-1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
"""
Generate the Motor Speed graph.
:params interval: update the graph based on an interval
"""
# Let's pass this to our map section
df = getData(var1, DInd)
trace = dict(
type="scatter",
y=df["ServoMeter"],
line={"color": "#42C4F7"},
hoverinfo="skip",
error_y={
"type": "data",
"array": df["ServoMotor"],
"thickness": 1.5,
"width": 2,
"color": "#B4E8FC",
},
mode="lines",
)
layout = dict(
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
height=400,
xaxis={
"range": [0, 200],
"showline": True,
"zeroline": False,
"fixedrange": True,
"tickvals": [0, 50, 100, 150, 200],
"ticktext": ["200", "150", "100", "50", "0"],
"title": "Time Elapsed (sec)",
},
yaxis={
"range": [
min(0, min(df["ServoMeter"])),
max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
],
"showgrid": True,
"showline": True,
"fixedrange": True,
"zeroline": False,
"gridcolor": app_color["graph_line"],
"nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
},
)
return dict(data=[trace], layout=layout)
@app.callback(
Output("servo-motor-direction", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_motor_direction(interval):
"""
Generate the Servo direction graph.
:params interval: update the graph based on an interval
"""
df = getData(var1, DInd)
val = df["ServoMeter"].iloc[-1]
direction = [0, (df["ServoMeter"][0]*100 – 20), (df["ServoMeter"][0]*100 + 20), 0]
traces_scatterpolar = [
{"r": [0, val, val, 0], "fillcolor": "#084E8A"},
{"r": [0, val * 0.65, val * 0.65, 0], "fillcolor": "#B4E1FA"},
{"r": [0, val * 0.3, val * 0.3, 0], "fillcolor": "#EBF5FA"},
]
data = [
dict(
type="scatterpolar",
r=traces["r"],
theta=direction,
mode="lines",
fill="toself",
fillcolor=traces["fillcolor"],
line={"color": "rgba(32, 32, 32, .6)", "width": 1},
)
for traces in traces_scatterpolar
]
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
autosize=False,
polar={
"bgcolor": app_color["graph_line"],
"radialaxis": {"range": [0, 45], "angle": 45, "dtick": 10},
"angularaxis": {"showline": False, "tickcolor": "white"},
},
showlegend=False,
)
return dict(data=data, layout=layout)
@app.callback(
Output("motor-histogram", "figure"),
[Input("iot-measure-update", "n_intervals")],
[
State("iot-measure", "figure"),
State("bin-slider", "value"),
State("bin-auto", "value"),
],
)
def gen_motor_histogram(interval, iot_speed_figure, slider_value, auto_state):
"""
Genererate iot histogram graph.
:params interval: upadte the graph based on an interval
:params iot_speed_figure: current Motor Speed graph
:params slider_value: current slider value
:params auto_state: current auto state
"""
motor_val = []
try:
print('Inside gen_motor_histogram:')
print('iot_speed_figure::')
print(iot_speed_figure)
# Check to see whether iot-measure has been plotted yet
if iot_speed_figure is not None:
motor_val = iot_speed_figure["data"][0]["y"]
if "Auto" in auto_state:
bin_val = np.histogram(
motor_val,
bins=range(int(round(min(motor_val))), int(round(max(motor_val)))),
)
else:
bin_val = np.histogram(motor_val, bins=slider_value)
except Exception as error:
raise PreventUpdate
avg_val = float(sum(motor_val)) / len(motor_val)
median_val = np.median(motor_val)
pdf_fitted = rayleigh.pdf(
bin_val[1], loc=(avg_val) * 0.55, scale=(bin_val[1][-1] – bin_val[1][0]) / 3
)
y_val = (pdf_fitted * max(bin_val[0]) * 20,)
y_val_max = max(y_val[0])
bin_val_max = max(bin_val[0])
trace = dict(
type="bar",
x=bin_val[1],
y=bin_val[0],
marker={"color": app_color["graph_line"]},
showlegend=False,
hoverinfo="x+y",
)
traces_scatter = [
{"line_dash": "dash", "line_color": "#2E5266", "name": "Average"},
{"line_dash": "dot", "line_color": "#BD9391", "name": "Median"},
]
scatter_data = [
dict(
type="scatter",
x=[bin_val[int(len(bin_val) / 2)]],
y=[0],
mode="lines",
line={"dash": traces["line_dash"], "color": traces["line_color"]},
marker={"opacity": 0},
visible=True,
name=traces["name"],
)
for traces in traces_scatter
]
trace3 = dict(
type="scatter",
mode="lines",
line={"color": "#42C4F7"},
y=y_val[0],
x=bin_val[1][: len(bin_val[1])],
name="Rayleigh Fit",
)
layout = dict(
height=350,
plot_bgcolor=app_color["graph_bg"],
paper_bgcolor=app_color["graph_bg"],
font={"color": "#fff"},
xaxis={
"title": "Motor Power",
"showgrid": False,
"showline": False,
"fixedrange": True,
},
yaxis={
"showgrid": False,
"showline": False,
"zeroline": False,
"title": "Number of Samples",
"fixedrange": True,
},
autosize=True,
bargap=0.01,
bargroupgap=0,
hovermode="closest",
legend={
"orientation": "h",
"yanchor": "bottom",
"xanchor": "center",
"y": 1,
"x": 0.5,
},
shapes=[
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": avg_val,
"x1": avg_val,
"type": "line",
"line": {"dash": "dash", "color": "#2E5266", "width": 5},
},
{
"xref": "x",
"yref": "y",
"y1": int(max(bin_val_max, y_val_max)) + 0.5,
"y0": 0,
"x0": median_val,
"x1": median_val,
"type": "line",
"line": {"dash": "dot", "color": "#BD9391", "width": 5},
},
],
)
return dict(data=[trace, scatter_data[0], scatter_data[1], trace3], layout=layout)
@app.callback(
Output("bin-auto", "value"),
[Input("bin-slider", "value")],
[State("iot-measure", "figure")],
)
def deselect_auto(slider_value, iot_speed_figure):
""" Toggle the auto checkbox. """
# prevent update if graph has no data
if "data" not in iot_speed_figure:
raise PreventUpdate
if not len(iot_speed_figure["data"]):
raise PreventUpdate
if iot_speed_figure is not None and len(iot_speed_figure["data"][0]["y"]) > 5:
return [""]
return ["Auto"]
@app.callback(
Output("bin-size", "children"),
[Input("bin-auto", "value")],
[State("bin-slider", "value")],
)
def show_num_bins(autoValue, slider_value):
""" Display the number of bins. """
if "Auto" in autoValue:
return "# of Bins: Auto"
return "# of Bins: " + str(int(slider_value))
if __name__ == "__main__":
app.run_server(debug=True)

view raw

app.py

hosted with ❤ by GitHub

Here are the key snippets –

html.Div(
        [
            html.Div(
                [html.H6("SERVO METER (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            ),
            # Second Panel
            html.Div(
                [html.H6("DC-MOTOR (IOT)", className="graph__title")]
            ),
            dcc.Graph(
                id="iot-measure-1",
                figure=dict(
                    layout=dict(
                        plot_bgcolor=app_color["graph_bg"],
                        paper_bgcolor=app_color["graph_bg"],
                    )
                ),
            ),
            dcc.Interval(
                id="iot-measure-update-1",
                interval=int(GRAPH_INTERVAL),
                n_intervals=0,
            )
        ],
        className="two-thirds column motor__speed__container",

The following line creates two panels, where the application will consume the streaming data by the app’s call-back feature & refresh the data & graphs as & when the application receives the streaming data.

A similar approach was adopted for other vital aspects/components inside the dashboard.

def getData(var, Ind):
    try:
        # Let's pass this to our map section
        df = x1.conStream(var, Ind)

        df['ServoMeterNew'] = df.apply(lambda row: toPositiveInflated(row, 'ServoMeter'), axis=1)
        df['ServoMotorNew'] = df.apply(lambda row: toPositive(row, 'ServoMeter'), axis=1)
        df['DCMotor'] = df.apply(lambda row: toPositiveInflated(row, 'DCMotor'), axis=1)
        df['DCMeterNew'] = df.apply(lambda row: toPositive(row, 'DCMotor'), axis=1)

        # Dropping old columns
        df.drop(columns=['ServoMeter','ServoMotor','DCMeter'], axis=1, inplace=True)

        #Rename New Columns to Old Columns
        df.rename(columns={'ServoMeterNew':'ServoMeter'}, inplace=True)
        df.rename(columns={'ServoMotorNew':'ServoMotor'}, inplace=True)
        df.rename(columns={'DCMeterNew':'DCMeter'}, inplace=True)

        return df
    except Exception as e:
        x = str(e)
        print(x)

        df = p.DataFrame()

        return df

The application is extracting streaming data & consuming it from the Ably queue.

@app.callback(
    Output("iot-measure", "figure"), [Input("iot-measure-update", "n_intervals")]
)
def gen_iot_speed(interval):
    """
    Generate the Motor Speed graph.

    :params interval: update the graph based on an interval
    """

    # Let's pass this to our map section
    df = getData(var1, DInd)

    trace = dict(
        type="scatter",
        y=df["ServoMeter"],
        line={"color": "#42C4F7"},
        hoverinfo="skip",
        error_y={
            "type": "data",
            "array": df["ServoMotor"],
            "thickness": 1.5,
            "width": 2,
            "color": "#B4E8FC",
        },
        mode="lines",
    )

    layout = dict(
        plot_bgcolor=app_color["graph_bg"],
        paper_bgcolor=app_color["graph_bg"],
        font={"color": "#fff"},
        height=400,
        xaxis={
            "range": [0, 200],
            "showline": True,
            "zeroline": False,
            "fixedrange": True,
            "tickvals": [0, 50, 100, 150, 200],
            "ticktext": ["200", "150", "100", "50", "0"],
            "title": "Time Elapsed (sec)",
        },
        yaxis={
            "range": [
                min(0, min(df["ServoMeter"])),
                max(100, max(df["ServoMeter"]) + max(df["ServoMotor"])),
            ],
            "showgrid": True,
            "showline": True,
            "fixedrange": True,
            "zeroline": False,
            "gridcolor": app_color["graph_line"],
            "nticks": max(6, round(df["ServoMeter"].iloc[-1] / 10)),
        },
    )

    return dict(data=[trace], layout=layout)

Capturing all the relevant columns & transform them into a graph, where the application will consume data into both the axis (x-axis & y-axis).

There are many other useful snippets, which creates separate useful widgets inside the dashboard.


Run:

Let us run the application –

Dashboard-View

So, we’ve done it.

You will get the complete codebase in the following Github link.

There is an excellent resource from the dash framework, which you should explore. The following link would be handy for developers who want to get some meaningful pre-built dashboard template, which you can customize as per your need through Python or R. Please find the link here.


I’ll bring some more exciting topic in the coming days from the Python verse.

Till then, Happy Avenging! 😀


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.

One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.

There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.