This site mainly deals with various use cases demonstrated using Python, Data Science, Cloud basics, SQL Server, Oracle, Teradata along with SQL & their implementation. Expecting yours active participation & time. This blog can be access from your TP, Tablet & mobile also. Please provide your feedback.
I’ll bring an exciting streamlit app that will reflect the real-time dashboard by consuming all the events from the Ably channel.
One more time, I’ll be utilizing my IoT emulator that will feed the real-time events based on the user inputs to the Ably channel, which will be subscribed to by the Streamlit-based app.
However, I would like to share the run before we dig deep into this.
Isn’t this exciting? How we can use our custom-built IoT emulator & capture real-time events to Ably Queue, then transform those raw events into more meaningful KPIs? Let’s deep dive then.
Architecture:
Let’s explore the broad-level architecture/flow –
As you can see, the green box is a demo IoT application that generates events & pushes them into the Ably Queue. At the same time, the streamlit-based Dashboard app consumes the events & transforms them into more meaningful metrics.
Package Installation:
Let us understand the sample packages that are required for this task.
Since this is an extension to our previous post, we’re not going to discuss other scripts, which we’ve already discussed over there. Instead, we will talk about the enhanced scripts & the new scripts that are required for this use case.
1. app.py (This script will consume real-time streaming data coming out from a hosted API source using another popular third-party service named Ably. Ably mimics the pub sub-streaming concept, which might be extremely useful for any start-up. This will then translate into many meaningful KPIs in a streamlit-based dashboard app.)
Note that, we’re not going to discuss the entire script here. Only those parts are relevant. However, you can get the complete scripts in the GitHub repository.
The above function creates a customized humidity gauge that visually represents a given humidity value, making it easy to read and understand at a glance.
This code defines a function “createHumidityGauge“ that creates a visual gauge (like a meter) to display a humidity value. Here’s a simple breakdown of what it does:
Function Definition: It starts by defining a function named createHumidityGauge that takes one parameter, humidity_value, which is the humidity level you want to display on the gauge.
Creating the Gauge: Inside the function, it creates a figure using Plotly (a plotting library) with a specific type of chart called an Indicator. This Indicator is set to display in “gauge+number” mode, meaning it shows both a gauge visual and the numeric value of the humidity.
Setting Gauge Properties:
The value is set to the humidity_value parameter, so the gauge shows this humidity level.
The domain sets the position of the gauge on the plot, which is set to fill the available space ([0, 1] for both x and y axes).
The title is set to “Humidity” with a font size of 24, labeling the gauge.
The gauge section defines the appearance and behavior of the gauge, including:
An axis that goes from 0 to 100 (assuming humidity is measured as a percentage from 0% to 100%).
The color and style of the gauge’s bar and background.
Colored steps indicating different ranges of humidity (cyan for 0-50% and royal blue for 50-100%).
A threshold line that appears at the value of the humidity, marked in red to stand out.
Finalizing the Gauge Appearance: The function then updates the layout of the figure to set its height, background color, font style, and margins to make sure the gauge looks nice and is visible.
Returning the Figure: Finally, the function returns the fig object, which is the fully configured gauge, ready to be displayed.
Other similar functions will repeat the same steps.
defcreateTemperatureLineChart(data): # Assuming'data'isaDataFramewitha'Timestamp'indexanda'Temperature'columnfig=px.line(data,x=data.index,y='Temperature',title='Temperature Vs Time')fig.update_layout(height=270) # Specifythedesiredheightherereturnfig
The above function takes a set of temperature data indexed by timestamp and creates a line chart that visually represents how the temperature changes over time.
This code defines a function “createTemperatureLineChart” that creates a line chart to display temperature data over time. Here’s a simple summary of what it does:
Function Definition: It starts with defining a function named “createTemperatureLineChart“ that takes one parameter, data, which is expected to be a DataFrame (a type of data structure used in pandas, a Python data analysis library). This data frame should have a ‘Timestamp’ as its index (meaning each row represents a different point in time) and a ‘Temperature’ column containing temperature values.
Creating the Line Chart: The function uses Plotly Express (a plotting library) to create a line chart with the following characteristics:
The x-axis represents time, taken from the DataFrame’s index (‘Timestamp’).
The y-axis represents temperature, taken from the ‘Temperature’ column in the DataFrame.
The chart is titled ‘Temperature Vs Time’, clearly indicating what the chart represents.
Customizing the Chart: It then updates the layout of the chart to set a specific height (270 pixels) for the chart, making it easier to view.
Returning the Chart: Finally, the function returns the fig object, which is the fully prepared line chart, ready to be displayed.
The above code will create a sidebar with drop-down lists, which will show the KPIs (“Temperature”, “Humidity”, “Pressure”).
# SplitthelayoutintocolumnsforKPIsandgraphsgauge_col,kpi_col,graph_col=st.columns(3) # Auto-refreshsetupst_autorefresh(interval=7000,key='data_refresh') # Fetchingreal-timedatadata=getData(var1,DInd)st.markdown(""" <style>.stEcharts{ margin-bottom:-50px; }/* Class might differ, inspect the HTML to find the correct class name */</style>""",unsafe_allow_html=True ) # Displaygaugesatthetopofthepagegauges=st.container()with gauges:col1,col2,col3=st.columns(3)with col1:humidity_value=round(data['Humidity'].iloc[-1],2)humidity_gauge_fig=createHumidityGauge(humidity_value)st.plotly_chart(humidity_gauge_fig,use_container_width=True)with col2:temp_value=round(data['Temperature'].iloc[-1],2)temp_gauge_fig=createTempGauge(temp_value)st.plotly_chart(temp_gauge_fig,use_container_width=True)with col3:pressure_value=round(data['Pressure'].iloc[-1],2)pressure_gauge_fig=createPressureGauge(pressure_value)st.plotly_chart(pressure_gauge_fig,use_container_width=True) # Nextrowforactualreadingsandchartsside-by-sidereadings_charts=st.container() # DisplayKPIsandtheirtrendswith readings_charts:readings_col,graph_col=st.columns([1,2])with readings_col:st.subheader("Latest Readings")if"Temperature"in selected_kpis:st.metric("Temperature",f"{temp_value:.2f}%")if"Humidity"in selected_kpis:st.metric("Humidity",f"{humidity_value:.2f}%")if"Pressure"in selected_kpis:st.metric("Pressure",f"{pressure_value:.2f}%") # GraphplaceholdersforeachKPIwith graph_col:if"Temperature"in selected_kpis:temperature_fig=createTemperatureLineChart(data.set_index("Timestamp")) # DisplaythePlotlychartinStreamlitwithspecifieddimensionsst.plotly_chart(temperature_fig,use_container_width=True)if"Humidity"in selected_kpis:humidity_fig=createHumidityLineChart(data.set_index("Timestamp")) # DisplaythePlotlychartinStreamlitwithspecifieddimensionsst.plotly_chart(humidity_fig,use_container_width=True)if"Pressure"in selected_kpis:pressure_fig=createPressureLineChart(data.set_index("Timestamp")) # DisplaythePlotlychartinStreamlitwithspecifieddimensionsst.plotly_chart(pressure_fig,use_container_width=True)
The code begins by splitting the Streamlit web page layout into three columns to separately display Key Performance Indicators (KPIs), gauges, and graphs.
It sets up an auto-refresh feature with a 7-second interval, ensuring the data displayed is regularly updated without manual refreshes.
Real-time data is fetched using a function called getData, which takes unspecified parameters var1 and DInd.
A CSS style is injected into the Streamlit page to adjust the margin of Echarts elements, which may be used to improve the visual layout of the page.
A container for gauges is created at the top of the page, with three columns inside it dedicated to displaying humidity, temperature, and pressure gauges.
Each gauge (humidity, temperature, and pressure) is created by rounding the last value from the fetched data to two decimal places and then visualized using respective functions that create Plotly gauge charts.
Below the gauges, another container is set up for displaying the latest readings and their corresponding graphs in a side-by-side layout, using two columns.
The left column under “Latest Readings” displays the latest values for selected KPIs (temperature, humidity, pressure) as metrics.
In the right column, for each selected KPI, a line chart is created using data with timestamps as indices and displayed using Plotly charts, allowing for a visual trend analysis.
This structured approach enables a dynamic and interactive dashboard within Streamlit, offering real-time insights into temperature, humidity, and pressure with both numeric metrics and graphical trends, optimized for regular data refreshes and user interactivity.
Run:
Let us understand some of the important screenshots of this application –
So, we’ve done it.
I’ll bring some more exciting topics in the coming days from the Python verse.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only.
There is always room for improvement in this kind of model & the solution associated with it. I’ve shown the basic ways to achieve the same for educational purposes only.
Today, I will share a new post in a part series about creating end-end LLMs that feed source data with RAG implementation. I’ll also use OpenAI python-based SDK and Haystack embeddings in this case.
In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well.
Before I explain the process to invoke this new library, why not view the demo first & then discuss it?
FLOW OF EVENTS:
Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.
As you can see, to enable this large & complex solution, we must first establish the capabilities to build applications powered by LLMs, Transformer models, vector search, and more. You can use state-of-the-art NLP models to perform question-answering, answer generation, semantic document search, or build tools capable of complex decision-making and query resolution. Hence, steps no. 1 & 2 showcased the data embedding & creating that informed repository. We’ll be discussing that in our second part.
Once you have the informed repository, the system can interact with the end-users. As part of the query (shown in step 3), the prompt & the question are shared with the process engine, which then turned to reduce the volume & get relevant context from our informed repository & get the tuned context as part of the response (Shown in steps 4, 5 & 6).
Then, this tuned context is shared with the OpenAI for better response & summary & concluding remarks that are very user-friendly & easier to understand for end-users (Shown in steps 8 & 9).
IMPORTANT PACKAGES:
The following are the important packages that are essential to this project –
Let us understand some of the important sections of the above script –
Function – login():
The login function retrieves a ‘username’ and ‘password’ from a JSON request and prints them. It checks if the provided credentials are missing from users or password lists, returning a failure JSON response if so. It creates and returns an access token in a JSON response if valid.
Function – get_chat():
The get_chat function retrieves the running session count and user input from a JSON request. Based on the session count, it extracts catalog data or processes the user’s message from the RAG framework that finally receives the refined response from the OpenAI, extracting hash values, image URLs, and wiki URLs. If an error arises, the function captures and returns the error as a JSON message.
Function – updateCounter():
The updateCounter function checks if a given CSV file exists and retrieves its counter value. It then increments the counter and writes it back to the CSV. If any errors occur, an error message is printed, and the function returns a value of 1.
Function – extractRemoveUrls():
The extractRemoveUrls function attempts to filter a data frame, resDf, based on a provided hash value to extract image and wiki URLs. If the data frame contains matching entries, it retrieves the corresponding URLs. Any errors encountered are printed, but the function always returns the image and wiki URLs, even if they are empty.
clsContentScrapper.py (This is the main class that brings the default options for the users if they agree with the initial prompt by the bot.)
Let us understand the the core part that require from this class.
Function – extractCatalog():
The extractCatalog function uses specific headers to make a GET request to a constructed URL. The URL is derived by appending ‘/departments’ to a base_url, and a header token is used in the request headers. If successful, it returns the text of the response; if there’s an exception, it prints the error and returns the error message.
clsRAGOpenAI.py (This is the main class that brings the RAG-enabled context that is fed to OpenAI for fine-tuned response with less cost.)
############################################################# Written By:SATYAKIDE ######## Written On:27-Jun-2023 ######## ModifiedOn28-Jun-2023 ######## ######## Objective:Thisisthemaincalling ######## pythonscriptthatwillinvokethe ######## shortcutapplicationcreatedinsideMAC ######## enviornmentincludingMacBook,IPadorIPhone. ######## #############################################################fromhaystack.document_stores.faissimportFAISSDocumentStorefromhaystack.nodesimportDensePassageRetrieverimportopenaifromclsConfigClientimportclsConfigClientascfimportclsLaslog# DisblingWarningdefwarn(*args,**kwargs):passimportwarningswarnings.warn = warnimportosimportre################################################## GlobalSection ##################################################Ind = cf.conf['DEBUG_IND']queryModel = cf.conf['QUERY_MODEL']passageModel = cf.conf['PASSAGE_MODEL']#InitiatingLoggingInstancesclog = log.clsL()os.environ["TOKENIZERS_PARALLELISM"] = "false"vectorDBFileName = cf.conf['VECTORDB_FILE_NM']indexFile = "vectorDB/" + str(vectorDBFileName) + '.faiss'indexConfig = "vectorDB/" + str(vectorDBFileName) + ".json"print('File: ',str(indexFile))print('Config: ',str(indexConfig))# Also,provide`config_path`parameterifyousetitwhencallingthe`save()`method:new_document_store = FAISSDocumentStore.load(index_path=indexFile,config_path=indexConfig)# InitializeRetrieverretriever = DensePassageRetriever(document_store=new_document_store,query_embedding_model=queryModel,passage_embedding_model=passageModel,use_gpu=False)################################################## EndofGlobalSection ##################################################classclsRAGOpenAI:def__init__(self):self.basePath = cf.conf['DATA_PATH']self.fileName = cf.conf['FILE_NAME']self.Ind = cf.conf['DEBUG_IND']self.subdir = str(cf.conf['OUT_DIR'])self.base_url = cf.conf['BASE_URL']self.outputPath = cf.conf['OUTPUT_PATH']self.vectorDBPath = cf.conf['VECTORDB_PATH']self.openAIKey = cf.conf['OPEN_AI_KEY']self.temp = cf.conf['TEMP_VAL']self.modelName = cf.conf['MODEL_NAME']self.maxToken = cf.conf['MAX_TOKEN']defextractHash(self,text):try: # Regularexpressionpatterntomatch'Ref: {'followedbyanumberandthen'}'pattern = r"Ref: \{'(\d+)'\}"match = re.search(pattern,text)ifmatch:returnmatch.group(1)else:returnNoneexceptExceptionase:x = str(e)print('Error: ',x)returnNonedefremoveSentencesWithNaN(self,text):try: # Splittextintosentencesusingregularexpressionsentences = re.split('(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s',text) # Filteroutsentencescontaining'nan'filteredSentences = [sentenceforsentenceinsentencesif'nan'notinsentence] # Rejointhesentencesreturn''.join(filteredSentences)exceptExceptionase:x = str(e)print('Error: ',x)return''defretrieveDocumentsReader(self,question,top_k=9):returnretriever.retrieve(question,top_k=top_k)defgenerateAnswerWithGPT3(self,retrieved_docs,question):try:openai.api_key = self.openAIKeytemp = self.tempmodelName = self.modelNamemaxToken = self.maxTokendocumentsText = "".join([doc.contentfordocinretrieved_docs])filteredDocs = self.removeSentencesWithNaN(documentsText)hashValue = self.extractHash(filteredDocs)print('RAG Docs:: ')print(filteredDocs) #prompt = f"Given the following documents: {documentsText}, answer the question accurately based on the above data with the supplied http urls: {question}" # Setupachat-stylepromptwithyourdatamessages = [{"role": "system", "content": "Youareahelpfulassistant,answerthequestionaccuratelybasedontheabovedatawiththesuppliedhttpurls. Onlyrelevantcontentneedstopublish. Pleasedonotprovidethefactsorthetextsthatresultscrossingthemax_tokenlimits."},{"role": "user", "content": filteredDocs} ] # Chatstyleinvokingthelatestmodelresponse = openai.ChatCompletion.create(model=modelName,messages=messages,temperature = temp,max_tokens=maxToken )returnhashValue,response.choices[0].message['content'].strip().replace('\n','\\n')exceptExceptionase:x = str(e)print('failed to get from OpenAI: ',x)return'Not Available!'defragAnswerWithHaystackAndGPT3(self,question):retrievedDocs = self.retrieveDocumentsReader(question)returnself.generateAnswerWithGPT3(retrievedDocs,question)defgetData(self,strVal):try:print('*'*120)print('Index Your Data for Retrieval:')print('*'*120)print('Response from New Docs: ')print()hashValue,answer = self.ragAnswerWithHaystackAndGPT3(strVal)print('GPT3 Answer::')print(answer)print('Hash Value:')print(str(hashValue))print('*'*240)print('End Of Use RAG to Generate Answers:')print('*'*240)returnhashValue,answerexceptExceptionase:x = str(e)print('Error: ',x)answer = xhashValue = 1returnhashValue,answer
Let us understand some of the important block –
Function – ragAnswerWithHaystackAndGPT3():
The ragAnswerWithHaystackAndGPT3 function retrieves relevant documents for a given question using the retrieveDocumentsReader method. It then generates an answer for the query using GPT-3 with the retrieved documents via the generateAnswerWithGPT3 method. The final response is returned.
Function – generateAnswerWithGPT3():
The generateAnswerWithGPT3 function, given a list of retrieved documents and a question, communicates with OpenAI’s GPT-3 to generate an answer. It first processes the documents, filtering and extracting a hash value. Using a chat-style format, it prompts GPT-3 with the processed documents and captures its response. If an error occurs, an error message is printed, and “Not Available!” is returned.
Function – retrieveDocumentsReader():
The retrieveDocumentsReader function takes in a question and an optional parameter, top_k (defaulted to 9). It is called the retriever.retrieve method with the given parameters. The result of the retrieval will generate at max nine responses from the RAG engine, which will be fed to OpenAI.
React:
App.js (This is the main react script, that will create the interface & parse the data apart from the authentication)
// App.jsimportReact,{useState}from'react';importaxiosfrom'axios';import'./App.css';constApp=()=>{const[isLoggedIn,setIsLoggedIn]=useState(false);const[username,setUsername]=useState('');const[password,setPassword]=useState('');const[message,setMessage]=useState('');const[chatLog,setChatLog]=useState([{sender:'MuBot',message:'Welcome to MuBot! Please explore the world of History from our brilliant collections! Do you want to proceed to see the catalog?'}]);consthandleLogin=async(e)=>{e.preventDefault();try{constresponse=awaitaxios.post('http://localhost:5000/login',{username,password});if (response.status===200) {setIsLoggedIn(true);}}catch (error) {console.error('Login error:',error);}};constsendMessage=async(username)=>{if (message.trim() ==='') return;// Create a new chat entryconstnewChatEntry={sender:'user',message:message.trim(),};// Clear the input fieldsetMessage('');try{// Make API request to Python-based APIconstresponse=awaitaxios.post('http://localhost:5000/chat',{message:newChatEntry.message});// Replace with your API endpoint URLconstresponseData=response.data;// Print the response to the console for debuggingconsole.log('API Response:',responseData);// Parse the nested JSON from the 'message' attributeconstjsonData=JSON.parse(responseData.message);// Check if the data contains 'departments'if (jsonData.departments) {// Extract the 'departments' attribute from the parsed dataconstdepartments=jsonData.departments;// Extract the department names and create a single string with line breaksconstbotResponseText=departments.reduce((acc,department)=>{returnacc+department.departmentId+''+department.displayName+'\n';},'');// Update the chat log with the bot's responsesetChatLog((prevChatLog)=> [...prevChatLog,{sender:'user',message:message},{sender:'bot',message:botResponseText},]);}elseif (jsonData.records){// Data structure 2: Artwork informationconstrecords=jsonData.records;// Prepare chat entriesconstchatEntries= [];// Iterate through records and extract text, image, and wiki informationrecords.forEach((record)=>{consttextInfo=Object.entries(record).map(([key,value])=>{if (key!=='Image'&&key!=='Wiki') {return`${key}: ${value}`;}returnnull;}).filter((info)=>info!==null).join('\n');constimageLink=record.Image;//const wikiLinks = JSON.parse(record.Wiki.replace(/'/g, '"'));//const wikiLinks = record.Wiki;constwikiLinks=record.Wiki.split(',').map(link=>link.trim());console.log('Wiki:',wikiLinks);// Check if there is a valid image linkconsthasValidImage=imageLink&&imageLink!=='[]';constimageElement=hasValidImage? (<imgsrc={imageLink}alt="Artwork"style={{maxWidth:'100%'}}/> ) :null;// Create JSX elements for rendering the wiki links (if available)constwikiElements=wikiLinks.map((link,index)=> (<divkey={index}><ahref={link}target="_blank"rel="noopener noreferrer"> Wiki Link {index+1}</a></div> ));if (textInfo) {chatEntries.push({sender:'bot',message:textInfo});}if (imageElement) {chatEntries.push({sender:'bot',message:imageElement});}if (wikiElements.length >0) {chatEntries.push({sender:'bot',message:wikiElements});}});// Update the chat log with the bot's responsesetChatLog((prevChatLog)=> [...prevChatLog,{sender:'user',message},...chatEntries, ]);}}catch (error) {console.error('Error sending message:',error);}};if (!isLoggedIn) {return (<divclassName="login-container"><h2>Welcome to the MuBot</h2><formonSubmit={handleLogin}className="login-form"><inputtype="text"placeholder="Enter your name"value={username}onChange={(e)=>setUsername(e.target.value)}required/><inputtype="password"placeholder="Enter your password"value={password}onChange={(e)=>setPassword(e.target.value)}required/><buttontype="submit">Login</button></form></div> );}return (<divclassName="chat-container"><divclassName="chat-header"><h2>Hello, {username}</h2><h3>Chat with MuBot</h3></div><divclassName="chat-log">{chatLog.map((chatEntry,index)=> (<divkey={index}className={`chat-entry ${chatEntry.sender==='user'?'user':'bot'}`}><spanclassName="user-name">{chatEntry.sender==='user'?username:'MuBot'}</span><pclassName="chat-message">{chatEntry.message}</p></div> ))}</div><divclassName="chat-input"><inputtype="text"placeholder="Type your message..."value={message}onChange={(e)=>setMessage(e.target.value)}onKeyPress={(e)=>{if (e.key==='Enter') {sendMessage();}}}/><buttononClick={sendMessage}>Send</button></div></div> );};exportdefaultApp;
Please find some of the important logic –
Function – handleLogin():
The handleLogin asynchronous function responds to an event by preventing its default action. It attempts to post a login request with a username and password to a local server endpoint. If the response is successful with a status of 200, it updates a state variable to indicate a successful login; otherwise, it logs any encountered errors.
Function – sendMessage():
The sendMessage asynchronous function is designed to handle the user’s chat interaction:
If the message is empty (after trimming spaces), the function exits without further action.
A chat entry object is created with the sender set as ‘user’ and the trimmed message.
The input field’s message is cleared, and an API request is made to a local server endpoint with the chat message.
If the API responds with a ‘departments’ attribute in its JSON, a bot response is crafted by iterating over department details.
If the API responds with ‘records’ indicating artwork information, the bot crafts responses for each record, extracting text, images, and wiki links, and generating JSX elements for rendering them.
After processing the API response, the chat log state is updated with the user’s original message and the bot’s responses.
Errors, if encountered, are logged to the console.
This function enables interactive chat with bot responses that vary based on the nature of the data received from the API.
DIRECTORY STRUCTURES:
Let us explore the directory structure starting from the parent to some of the important child folder should look like this –
So, finally, we’ve done it.
I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.
You will get the complete codebase in the following GitHub link.
I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality.
Today, I’m very excited to demonstrate an effortless & new way to extract the transcript from YouTube videos & then answer the questions based on the topics selected by the users. In this post, I plan to deal with the user inputs to consider the case first & then it can summarize the video content through useful advanced analytics with the help of the LangChain & OpenAI-based model.
In this post, I’ve directly subscribed to OpenAI & I’m not using OpenAI from Azure. However, I’ll explore that in the future as well. Before I explain the process to invoke this new library, why not view the demo first & then discuss it?
Isn’t it very exciting? This will lead to a whole new ballgame, where one can get critical decision-making information from these human sources along with their traditional advanced analytical data.
How will it help?
Let’s say as per your historical data & analytics, the dashboard is recommending prod-A, prod-B & prod-C as the top three products for potential top-performing brands. Whereas, you are getting some alerts from the TV news on prod-B due to the recent incidents. So, in that case, you don’t want to continue with the prod-B investment. You may find a new product named prod-Z. That may reduce the risk of your investment.
What is LangChain?
LangChain is a framework for developing applications powered by language models. We believe that the most powerful and differentiated applications will not only call out to a language model but will also be:
Data-aware: connect a language model to other sources of data
Agentic: allow a language model to interact with its environment
The LangChain framework works around these principles.
To know more about this, please click the following link.
As you can see, this is one of the critical components in our solution, which will bind the OpenAI bot & it will feed the necessary data to provide the correct response.
What is FAISS?
Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that do not fit in RAM. It also has supporting code for evaluation and parameter tuning.
Faiss developed using C++ with complete wrappers for Python—some of the most beneficial algorithms available both on CPU & in GPU as well. Facebook AI Research develops it.
To know more about this, please click the following link.
FLOW OF EVENTS:
Let us look at the flow diagram as it captures the sequence of events that unfold as part of the process.
Here are the steps that will follow in sequence –
The application will first get the topic on which it needs to look from YouTube & find the top 5 videos using the YouTube data-API.
Once the application returns a list of websites from the above step, LangChain will drive the application will extract the transcripts from the video & then optimize the response size in smaller chunks to address the costly OpenAI calls. During this time, it will invoke FAISS to create document DBs.
Finally, it will send those chunks to OpenAI for the best response based on your supplied template that performs the final analysis with small data required for your query & gets the appropriate response with fewer costs.
CODE:
Why don’t we go through the code made accessible due to this new library for this particular use case?
clsConfigClient.py (This is the main calling Python script for the input parameters.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
From the above code snippet, one can understand that we need both the API keys for YouTube & OpenAI. And they have separate costs & usage, which I’ll share later in the post. Also, notice that the temperature sets to 0.2 ( range between 0 to 1). That means our AI bot will be consistent in response. And our application will use the GPT-3.5-turbo model for its analytic response.
clsTemplate.py (Contains all the templates for OpenAI.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
The above code is self-explanatory. Here, we’re keeping the correct instructions for our OpenAI to respond within these guidelines.
clsVideoContentScrapper.py (Main class to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
The above code will fetch the most relevant YouTube URLs & bind them into a list along with the channel names & then share the lists with the main functions.
The provided Python code defines a function createDBFromYoutubeVideoUrl which appears to create a database of text documents from the transcript of a YouTube video. Here’s the explanation in simple English:
The function createDBFromYoutubeVideoUrl has defined with one argument: video_url.
The function uses a try-except block to handle any potential exceptions or errors that may occur.
Inside the try block, the following steps are going to perform:
First, it creates a YoutubeLoader object from the provided video_url. This object is likely responsible for interacting with the YouTube video specified by the URL.
The loader object then loads the transcript of the video. This object is the text version of everything spoken in the video.
It then creates a RecursiveCharacterTextSplitter object with a specified chunk_size of 1000 and chunk_overlap of 100. This object may split the transcript into smaller chunks (documents) of text for easier processing or analysis. Each piece will be around 1000 characters long, and there will overlap of 100 characters between consecutive chunks.
The split_documents method of the text_splitter object will split the transcript into smaller documents. These documents are stored in the docs variable.
The FAISS.from_documents method is then called with docs and embeddings as arguments to create a FAISS (Facebook AI Similarity Search) index. This index is a database used for efficient similarity search and clustering of high-dimensional vectors, which in this case, are the embeddings of the documents. The FAISS index is stored in the db variable.
Finally, the db variable is returned, representing the created database from the video transcript.
4. If an exception occurs during the execution of the try block, the code execution moves to the except block:
Here, it first converts the exception e to a string x.
Then it prints an error message.
Finally, it returns an empty string as an indication of the error.
defgetResponseFromQuery(self,db,query,k=4):try:""" gpt-3.5-turbo can handle up to 4097 tokens. Setting the chunksize to 1000 and k to 4 maximizesthenumberoftokenstoanalyze.""" mod_name = self.model_nametemp_val=self.temp_valdocs=db.similarity_search(query,k=k)docs_page_content="".join([d.page_contentfordindocs])chat=ChatOpenAI(model_name=mod_name,temperature=temp_val) # Templatetouseforthesystemmessageprompttemplate=ct.templateVal_1system_message_prompt=SystemMessagePromptTemplate.from_template(template) # Humanquestionprompthuman_template="Answer the following question: {question}"human_message_prompt=HumanMessagePromptTemplate.from_template(human_template)chat_prompt=ChatPromptTemplate.from_messages( [system_message_prompt,human_message_prompt] )chain=LLMChain(llm=chat,prompt=chat_prompt)response=chain.run(question=query,docs=docs_page_content)response=response.replace("\n","")returnresponse,docsexceptExceptionas e:x=str(e)print('Error: ',x)return'',''
The Python function getResponseFromQuery is designed to search a given database (db) for a specific query and then generate a response using a language model (possibly GPT-3.5-turbo). The answer is based on the content found and the particular question. Here is a simple English summary:
The function getResponseFromQuery takes three parameters: db, query, and k. The k parameter is optional and defaults to 4 if not provided. db is the database to search, the query is the question or prompts to analyze, and k is the number of similar items to return.
The function initiates a try-except block for handling any errors that might occur.
Inside the try block:
The function retrieves the model name and temperature value from the instance of the class this function is a part of.
The function then searches the db database for documents similar to the query and saves these in docs.
It concatenates the content of the returned documents into a single string docs_page_content.
It creates a ChatOpenAI object with the model name and temperature value.
It creates a system message prompt from a predefined template.
It creates a human message prompt, which is the query.
It combines these two prompts to form a chat prompt.
An LLMChain object is then created using the ChatOpenAI object and the chat prompt.
This LLMChain object is used to generate a response to the query using the content of the documents found in the database. The answer is then formatted by replacing all newline characters with empty strings.
Finally, the function returns this response along with the original documents.
If any error occurs during these operations, the function goes to the except block where:
The error message is printed.
The function returns two empty strings to indicate an error occurred, and no response or documents could be produced.
defextractContentInText(self,topic,query):try:discussedTopic= []strKeyText=''cnt=0max_cnt=self.max_cnturlList,channelList=self.topFiveURLFromYouTube(youtube,q=topic,part='id,snippet',maxResults=max_cnt,type='video')print('Returned List: ')print(urlList)print()forvideo_urlin urlList:print('Processing Video: ')print(video_url)db=self.createDBFromYoutubeVideoUrl(video_url)response,docs=self.getResponseFromQuery(db,query)iflen(response) >0:strKeyText='As per the topic discussed in '+channelList[cnt] +', 'discussedTopic.append(strKeyText+response)cnt+=1returndiscussedTopicexceptExceptionas e:discussedTopic= []x=str(e)print('Error: ',x)returndiscussedTopic
This Python function, extractContentInText, is aimed to extract relevant content from the transcripts of top YouTube videos on a specific topic and generate responses to a given query. Here’s a simple English translation:
The function extractContentInText is defined with topic and query as parameters.
It begins with a try-except block to catch and handle any possible exceptions.
In the try block:
It initializes several variables: an empty list discussedTopic to store the extracted information, an empty string strKeyText to keep specific parts of the content, a counter cnt initialized at 0, and max_cnt retrieved from the self-object to specify the maximum number of YouTube videos to consider.
It calls the topFiveURLFromYouTube function (defined previously) to get the URLs of the top videos on the given topic from YouTube. It also retrieves the list of channel names associated with these videos.
It prints the returned list of URLs.
Then, it starts a loop over each URL in the urlList.
For each URL, it prints the URL, then creates a database from the transcript of the YouTube video using the function createDBFromYoutubeVideoUrl.
It then uses the getResponseFromQuery function to get a response to the query based on the content of the database.
If the length of the response is greater than 0 (meaning there is a response), it forms a string strKeyText to indicate the channel that the topic was discussed on and then appends the answer to this string. This entire string is then added to the discussedTopic list.
It increments the counter cnt by one after each iteration.
Finally, it returns the discussedTopic list, which now contains relevant content extracted from the videos.
If any error occurs during these operations, the function goes into the except block:
It first resets discussedTopic to an empty list.
Then it converts the exception e to a string and prints the error message.
Lastly, it returns the empty discussedTopic list, indicating that no content could be extracted due to the error.
testLangChain.py (Main Python script to extract the transcript from the YouTube videos & then answer the questions based on the topics selected by the users.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
defmain():try:var=datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")print('*'*120)print('Start Time: '+str(var))print('*'*120) #query="What are they saying about Microsoft?"print('Please share your topic!')inputTopic=input('User: ')print('Please ask your questions?')inputQry=input('User: ')print()retList=cVCScrapper.extractContentInText(inputTopic,inputQry)cnt=0fordiscussedTopicin retList:finText=str(cnt+1) +') '+discussedTopicprint()print(textwrap.fill(finText,width=150))cnt+=1r1=len(retList)ifr1>0:print()print('Successfully Scrapped!')else:print()print('Failed to Scrappe!')print('*'*120)var1=datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")print('End Time: '+str(var1))exceptExceptionas e:x=str(e)print('Error: ',x)if__name__=="__main__":main()
The above main application will capture the topics from the user & then will give the user a chance to ask specific questions on the topics, invoking the main class to extract the transcript from YouTube & then feed it as a source using ChainLang & finally deliver the response. If there is no response, then it will skip the overall options.
USAGE & COST FACTOR:
Please find the OpenAI usage –
Please find the YouTube API usage –
So, finally, we’ve done it.
I know that this post is relatively bigger than my earlier post. But, I think, you can get all the details once you go through it.
You will get the complete codebase in the following GitHub link.
I’ll bring some more exciting topics in the coming days from the Python verse. Please share & subscribe to my post & let me know your feedback.
Till then, Happy Avenging! 🙂
Note: All the data & scenarios posted here are representational data & scenarios & available over the internet & for educational purposes only. Some of the images (except my photo) we’ve used are available over the net. We don’t claim ownership of these images. There is always room for improvement & especially in the prediction quality. Sample video taken from Santrel Media & you would find the link over here.
This week we’re going to extend one of our earlier posts & trying to read an entire text from streaming using computer vision. If you want to view the previous post, please click the following link.
But, before we proceed, why don’t we view the demo first?
Architecture:
Let us understand the architecture flow –
The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & extracts the complete text within the video & displays it on top of the target screen besides prints the same in the console.
Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.
clsReadingTextFromStream.py (This is the main class of python script that will extract the text from the WebCAM streaming in real-time.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Please find the key snippet from the above script –
# Two output layer names for the text detector model
lNames = cf.conf['LAYER_DET']
# Tesseract OCR text param values
strVal = "-l " + str(cf.conf['LANG']) + " --oem " + str(cf.conf['OEM_VAL']) + " --psm " + str(cf.conf['PSM_VAL']) + ""
config = (strVal)
The first line contains the two output layers’ names for the text detector model. Among them, the first one indicates the outcome possibilities & the second one use to derive the bounding box coordinates of the predicted text.
The second line contains various options for the tesseract APIs. You need to understand the opportunities in detail to make them work. These are the essential options for our use case –
Language – The intended language, for example, English, Spanish, Hindi, Bengali, etc.
OEM flag – In this case, the application will use 4 to indicate LSTM neural net model for OCR.
OEM Value – In this case, the selected value is 7, indicating that the application treats the ROI as a single line of text.
For more details, please refer to the config file.
print("[INFO] Loading Text Detector...")
net = cv2.dnn.readNet(modelPath)
The above lines bring the already created model & load it to memory for evaluation.
# Setting new width and height and then determine the ratio in change
# for both the width and height
(newW, newH) = (wt, ht)
rW = origW / float(newW)
rH = origH / float(newH)
# Resize the frame and grab the new frame dimensions
frame = cv2.resize(frame, (newW, newH))
(H, W) = frame.shape[:2]
# Construct a blob from the frame and then perform a forward pass of
# the model to obtain the two output layer sets
blob = cv2.dnn.blobFromImage(frame, 1.0, (W, H), sParam, swapRB=True, crop=False)
net.setInput(blob)
(confScore, imgGeo) = net.forward(lNames)
# Decode the predictions, then apply non-maxima suppression to
# suppress weak, overlapping bounding boxes
(rects, confidences) = self.predictText(confScore, imgGeo)
boxes = non_max_suppression(np.array(rects), probs=confidences)
The above lines are more of preparing individual frames to get the bounding box by resizing the height & width followed by a forward pass of the model to obtain two output layer sets. And then apply the non-maxima suppression to remove the weak, overlapping bounding box by interpreting the prediction. In short, this will identify the potential text region & put the bounding box surrounding it.
# Initialize the list of results
res = []
# Getting BoundingBox boundaries
res = self.findBoundBox(boxes, res, rW, rH, orig, origW, origH, pad)
The above function will create the bounding box surrounding the predicted text regions. Also, we will capture the expected text inside the result variable.
for (spX, spY, epX, epY) in boxes:
# Scale the bounding box coordinates based on the respective
# ratios
spX = int(spX * rW)
spY = int(spY * rH)
epX = int(epX * rW)
epY = int(epY * rH)
# To obtain a better OCR of the text we can potentially
# apply a bit of padding surrounding the bounding box.
# And, computing the deltas in both the x and y directions
dX = int((epX - spX) * pad)
dY = int((epY - spY) * pad)
# Apply padding to each side of the bounding box, respectively
spX = max(0, spX - dX)
spY = max(0, spY - dY)
epX = min(origW, epX + (dX * 2))
epY = min(origH, epY + (dY * 2))
# Extract the actual padded ROI
roi = orig[spY:epY, spX:epX]
Now, the application will scale the bounding boxes based on the previously computed ratio for actual text recognition. In this process, the application also padded the bounding boxes & then extracted the padded region of interest.
# Choose the proper OCR Config
text = pytesseract.image_to_string(roi, config=config)
# Add the bounding box coordinates and OCR'd text to the list
# of results
res.append(((spX, spY, epX, epY), text))
Using OCR options, the application extracts the text within the video frame & adds that to the res list.
# Sort the results bounding box coordinates from top to bottom
res = sorted(res, key=lambda r:r[0][1])
It then sends a sorted output to the primary calling functions.
for ((spX, spY, epX, epY), text) in res:
# Display the text OCR by using Tesseract APIs
print("Reading Text::")
print("=" *60)
print(text)
print("=" *60)
# Removing the non-ASCII text so it can draw the text on the frame
# using OpenCV, then draw the text and a bounding box surrounding
# the text region of the input frame
text = "".join([c if ord(c) < aRange else "" for c in text]).strip()
output = orig.copy()
cv2.rectangle(output, (spX, spY), (epX, epY), drawTag, 2)
cv2.putText(output, text, (spX, spY - 20), cv2.FONT_HERSHEY_SIMPLEX, 1.2, drawTag, 3)
# Show the output frame
cv2.imshow(title, output)
Finally, it fetches the potential text region along with the text & then prints on top of the source video. Also, it removed some non-printable characters during this time to avoid any cryptic texts.
readingVideo.py (Main calling script.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
# Instantiating all the main class
x1 = rtfs.clsReadingTextFromStream()
# Execute all the pass
r1 = x1.processStream(debugInd, var)
if (r1 == 0):
print('Successfully read text from the Live Stream!')
else:
print('Failed to read text from the Live Stream!')
The above lines instantiate the main calling class & then invoke the function to get the desired extracted text from the live streaming video if that is successful.
FOLDER STRUCTURE:
Here is the folder structure that contains all the files & directories in MAC O/S –
You will get the complete codebase in the following Github link.
Unfortunately, I cannot upload the model due to it’s size. I will share on the need basis.
I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.
Till then, Happy Avenging! 🙂
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.
Today, I’m going to discuss another Computer Vision installment. I’ll discuss how to implement Augmented Reality using Open-CV Computer Vision with full audio. We will be using part of a Bengali OTT Series called “Feludar Goendagiri” entirely for educational purposes & also as a tribute to the great legendary director, late Satyajit Roy. To know more about him, please click the following link.
Why don’t we see the demo first before jumping into the technical details?
Architecture:
Let us understand the architecture –
The above diagram shows that the application, which uses the Open-CV, analyzes individual frames from the source & blends that with the video trailer. Finally, it creates another video by correctly mixing the source audio.
Python Packages:
Following are the python packages that are necessary to develop this brilliant use case –
pip install opencv-python
pip install pygame
CODE:
Let us now understand the code. For this use case, we will only discuss three python scripts. However, we need more than these three. However, we have already discussed them in some of the early posts. Hence, we will skip them here.
clsAugmentedReality.py (This is the main class of python script that will embed the source video with the WebCAM streams in real-time.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Identifying the Aruco markers are key here. The above lines help the program detect all four corners.
However, let us discuss more on the Aruco markers & strategies that I’ve used for several different surfaces.
As you can see, the right-hand side Aruco marker is tiny compared to the left one. Hence, that one will be ideal for a curve surface like Coffee Mug, Bottle rather than a flat surface.
Also, we’ve demonstrated the zoom capability with the smaller Aruco marker that will Augment almost double the original surface area.
Let us understand why we need that; as you know, any spherical surface like a bottle is round-shaped. Hence, detecting relatively more significant Aruco markers in four corners will be difficult for any camera to identify.
Hence, we need a process where close four corners can be extrapolated mathematically to relatively larger projected areas easily detectable by any WebCAM.
Let’s observe the following figure –
Simulated Extrapolated corners
As you can see that the original position of the four corners is represented using the following points, i.e., (x1, y1), (x2, y2), (x3, y3) & (x4, y4).
And these positions are very close to each other. Hence, it will be easier for the camera to detect all the points (like a plain surface) without many retries.
And later, you can add specific values of x & y to them to get the derived four corners as shown in the above figures through the following points, i.e. (x1.1, y1.1), (x2.1, y2.1), (x3.1, y3.1) & (x4.1, y4.1).
# Loop over the IDs of the ArUco markers in Top-Left, Top-Right,
# Bottom-Right, and Bottom-Left order
for i in cornerIDs:
# Grab the index of the corner with the current ID
j = np.squeeze(np.where(ids == i))
# If we receive an empty list instead of an integer index,
# then we could not find the marker with the current ID
if j.size == 0:
continue
# Otherwise, append the corner (x, y)-coordinates to our list
# of reference points
corner = np.squeeze(corners[j])
refPts.append(corner)
# Check to see if we failed to find the four ArUco markers
if len(refPts) != 4:
# If we are allowed to use cached reference points, fall
# back on them
if useCache and CACHED_REF_PTS is not None:
refPts = CACHED_REF_PTS
# Otherwise, we cannot use the cache and/or there are no
# previous cached reference points, so return early
else:
return None
# If we are allowed to use cached reference points, then update
# the cache with the current set
if useCache:
CACHED_REF_PTS = refPts
# Unpack our Aruco reference points and use the reference points
# to define the Destination transform matrix, making sure the
# points are specified in Top-Left, Top-Right, Bottom-Right, and
# Bottom-Left order
(refPtTL, refPtTR, refPtBR, refPtBL) = refPts
dstMat = [refPtTL[0], refPtTR[1], refPtBR[2], refPtBL[3]]
dstMat = np.array(dstMat)
In the above snippet, the application will scan through all the points & try to detect Aruco markers & then create a list of reference points, which will later be used to define the destination transformation matrix.
The above snippets calculate the revised points for the zoom-out capabilities as discussed in one of the earlier figures.
# Define the transform matrix for the *source* image in Top-Left,
# Top-Right, Bottom-Right, and Bottom-Left order
srcMat = np.array([[0, 0], [srcW, 0], [srcW, srcH], [0, srcH]])
The above snippet will create a transformation matrix for the video trailer.
# Compute the homography matrix and then warp the source image to
# the destination based on the homography depending upon the
# zoom flag
if zoomFlag == 1:
(H, _) = cv2.findHomography(srcMat, dstMat)
else:
(H, _) = cv2.findHomography(srcMat, dstMatMod)
warped = cv2.warpPerspective(source, H, (imgW, imgH))
# Construct a mask for the source image now that the perspective
# warp has taken place (we'll need this mask to copy the source
# image into the destination)
mask = np.zeros((imgH, imgW), dtype="uint8")
if zoomFlag == 1:
cv2.fillConvexPoly(mask, dstMat.astype("int32"), (255, 255, 255), cv2.LINE_AA)
else:
cv2.fillConvexPoly(mask, dstMatMod.astype("int32"), (255, 255, 255), cv2.LINE_AA)
# This optional step will give the source image a black
# border surrounding it when applied to the source image, you
# can apply a dilation operation
rect = cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3))
mask = cv2.dilate(mask, rect, iterations=2)
# Create a three channel version of the mask by stacking it
# depth-wise, such that we can copy the warped source image
# into the input image
maskScaled = mask.copy() / 255.0
maskScaled = np.dstack([maskScaled] * 3)
# Copy the warped source image into the input image by
# (1) Multiplying the warped image and masked together,
# (2) Then multiplying the original input image with the
# mask (giving more weight to the input where there
# are not masked pixels), and
# (3) Adding the resulting multiplications together
warpedMultiplied = cv2.multiply(warped.astype("float"), maskScaled)
imageMultiplied = cv2.multiply(frame.astype(float), 1.0 - maskScaled)
output = cv2.add(warpedMultiplied, imageMultiplied)
output = output.astype("uint8")
Finally, depending upon the zoom flag, the application will create a warped image surrounded by an optionally black border.
clsEmbedVideoWithStream.py (This is the main class of python script that will invoke the clsAugmentedReality class to initiate augment reality after splitting the audio & video & then project them via the Web-CAM with a seamless broadcast.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Please find the key snippet from the above script –
def playAudio(self, audioFile, audioLen, freq, stopFlag=False):
try:
pygame.mixer.init()
pygame.init()
pygame.mixer.music.load(audioFile)
pygame.mixer.music.set_volume(10)
val = int(audioLen)
i = 0
while i < val:
pygame.mixer.music.play(loops=0, start=float(i))
time.sleep(freq)
i = i + 1
if (i >= val):
raise BreakLoop
if (stopFlag==True):
raise BreakLoop
return 0
except BreakLoop as s:
return 0
except Exception as e:
x = str(e)
print(x)
return 1
The above function will initiate the pygame library to run the sound of the video file that has been extracted as part of a separate process.
def extractAudio(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
The above function temporarily extracts the audio file from the source trailer video.
# Initialize the video file stream
print("[INFO] accessing video stream...")
vf = cv2.VideoCapture(videoFile)
x = self.extractAudio(videoFile)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
# Initialize a queue to maintain the next frame from the video stream
Q = deque(maxlen=128)
# We need to have a frame in our queue to start our augmented reality
# pipeline, so read the next frame from our video file source and add
# it to our queue
(grabbed, source) = vf.read()
Q.appendleft(source)
# Initialize the video stream and allow the camera sensor to warm up
print("[INFO] starting video stream...")
vs = VideoStream(src=0).start()
time.sleep(2.0)
flg = 0
The above snippets read the frames from the video file after invoking the audio extraction. Then, it uses a Queue method to store all the video frames for better performance. And finally, it starts consuming the standard streaming video from the WebCAM to augment the trailer video on top of it.
t = threading.Thread(target=self.playAudio, args=(audioFile, audioLen, audioFreq, stopFlag,))
t.daemon = True
Now, the application has instantiated an orphan thread to spin off the audio play function. The reason is to void the performance & video frame frequency impact on top of it.
while len(Q) > 0:
try:
# Grab the frame from our video stream and resize it
frame = vs.read()
frame = imutils.resize(frame, width=1020)
# Attempt to find the ArUCo markers in the frame, and provided
# they are found, take the current source image and warp it onto
# input frame using our augmented reality technique
warped = x1.getWarpImages(
frame, source,
cornerIDs=(923, 1001, 241, 1007),
arucoDict=arucoDict,
arucoParams=arucoParams,
zoomFlag=zFlag,
useCache=CacheL > 0)
# If the warped frame is not None, then we know (1) we found the
# four ArUCo markers and (2) the perspective warp was successfully
# applied
if warped is not None:
# Set the frame to the output augment reality frame and then
# grab the next video file frame from our queue
frame = warped
source = Q.popleft()
if flg == 0:
t.start()
flg = flg + 1
# For speed/efficiency, we can use a queue to keep the next video
# frame queue ready for us -- the trick is to ensure the queue is
# always (or nearly full)
if len(Q) != Q.maxlen:
# Read the next frame from the video file stream
(grabbed, nextFrame) = vf.read()
# If the frame was read (meaning we are not at the end of the
# video file stream), add the frame to our queue
if grabbed:
Q.append(nextFrame)
# Show the output frame
cv2.imshow(title, frame)
time.sleep(videoFrame)
# If the `q` key was pressed, break from the loop
if cv2.waitKey(2) & 0xFF == ord('q'):
stopFlag = True
break
except BreakLoop:
raise BreakLoop
except Exception as e:
pass
if (len(Q) == Q.maxlen):
time.sleep(2)
break
The final segment will call the getWarpImages function to get the Augmented image on top of the video. It also checks for the upcoming frames & whether the source video is finished or not. In case of the end, the application will initiate a break method to come out from the infinite WebCAM read. Also, there is a provision for manual exit by pressing the ‘Q’ from the MacBook keyboard.
# Performing cleanup at the end
cv2.destroyAllWindows()
vs.stop()
It is always advisable to close your camera & remove any temporarily available windows that are still left once the application finishes the process.
augmentedMovieTrailer.py (Main calling script)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
The above script will initially instantiate the main calling class & then invoke the processStream function to create the Augmented Reality.
FOLDER STRUCTURE:
Here is the folder structure that contains all the files & directories in MAC O/S –
Directory Structure
You will get the complete codebase in the following Github link.
If you want to know more about this legendary director & his famous work, please visit the following link.
I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.
Till then, Happy Avenging! 🙂
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.
Today, I’ll be using another exciting installment of Computer Vision. Today, our focus will be to get a sense of visual counting. Let me explain. This post will demonstrate how to count the number of stacked-up coins using computer vision. And, we’re going to add more coins to see the number changes.
Why don’t we see the demo first before jumping into the technical details?
Demo
Isn’t it exciting?
Architecture:
Let us understand the architecture –
From the above diagram, one can notice that as raw video feed captured from a specific location at a measured distance. The python-based intelligent application will read the numbers & project on top of the video feed for human validations.
Let me share one more perspective of how you can configure this experiment with another diagram that I prepared for this post.
Setup Process
From the above picture, one can see that a specific distance exists between the camera & the stacked coins as that will influence the single coin width.
You can see how that changed with the following pictures –
This entire test will depend upon many factors to consider to get effective results. I provided the basic demo. However, to make it robust & dynamic, one can dynamically diagnose the distance & individual coin width before starting this project. I felt that part should be machine learning to correctly predict the particular coin width depending upon the length & number of coins stacked. I leave it to you to explore that part.
Then how does the Aruco marker comes into the picture?
Let’s read it from the primary source side –
From: Source
Please refer to the following link if you want to know more.
For our use case, we’ll be using the following aruco marker –
Marker
How will this help us? Because we know the width & height of it. And depending upon the placement & overall pixel area size, our application can then identify the pixel to centimeter ratio & which will enable us to predict any other objects’ height & width. Once we have that, the application will divide that by the calculated width we observed for each coin from this distance. And, then the application will be able to predict the actual counts in real-time.
How can you identify the individual width?
My easy process would be to put ten quarter dollars stacked up & then you will get the height from the Computer vision. You have to divide that height by 10 to get the individual width of the coin until you build the model to predict the correct width depending upon the distance.
CODE:
Let us understand the code now –
clsConfig.py (Configuration file for the entire application.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
PIC_TO_CM_MAP is the total length of the Aruco marker in centimeters involving all four sides.
CONTOUR_AREA will change depending upon the minimum size you want to identify as part of the contour.
COIN_DEF_HEIGHT needs to be revised as part of the previous steps explained.
clsAutoDetector.py (This python script will detect the contour.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Key snippets from the above script are as follows –
# Find contours
conts, Oth = cv2.findContours(maskImage, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
objectsConts = []
for cnt in conts:
area = cv2.contourArea(cnt)
if area > cntArea:
objectsConts.append(cnt)
Depending upon the supplied contour area, this script will identify & mark the contour of every frame captured through WebCam.
clsCountRealtime.py (This is the main class to calculate the number of stacked coins after reading using computer vision.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
It displays both the height, width & total number of coins on top of the live video.
if cv2.waitKey(1) & 0xFF == ord('q'):
break
The above line will help the developer exit from the visual application by pressing the escape or ‘q’ key in Macbook.
visualDataRead.py (Main calling function.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
# Execute all the pass
r1 = x1.learnStats(debugInd, var)
if (r1 == 0):
print('Successfully counts number of stcaked coins!')
else:
print('Failed to counts number of stcaked coins!')
The above code invokes the learnStats function to calculate the count of stacked coins.
FOLDER STRUCTURE:
Folder Details
So, we’ve done it.
You will get the complete codebase in the following Github link.
I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the prediction quality.
Today, I’ll be sharing one of the most exciting posts I’ve ever shared. This post is rare as you cannot find the most relevant working solution easily over the net.
So, what are we talking about here? We’re going to build a Python-based iOS App using the Kivy framework. You get plenty of videos & documents on this as well. However, nowhere you’ll find the capability that I’m about to disclose. We’ll consume live IoT streaming data from a dummy application & then plot them in a MatplotLib dashboard inside the mobile App. And that’s where this post is seriously different from the rest of the available white papers.
But, before we dig into more details, let us see a quick demo of our iOS App.
Demo:
Demo
Isn’t it exciting? Great! Now, let’s dig into the details.
Let’s understand the architecture as to how we want to proceed with the solution here.
Architecture:
Broad-level design
The above diagram shows that the Kive-based iOS application that will consume streaming data from the Ably queue. The initial dummy IoT application will push the real-time events to the same Ably queue.
So, now we understand the architecture. Fantastic!
Let’s deep dive into the code that we specifically built for this use case.
Code:
IoTDataGen.py (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
In the above snippet, we’re forming the payload dynamically & then calling the “pushEvents” to push all the random generated IoT mock-events to the Ably queue.
2. custom.kv (Publishing Streaming data to Ably channels & captured IoT events from the simulator & publish them in Dashboard through measured KPIs.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
To understand this, one needs to learn how to prepare a Kivy design layout using the KV-language. You can develop the same using native-python code as well. However, I wanted to explore this language & not to mention that this is the preferred way of doing a front-end GUI design in Kivy.
Like any graphical interface, one needs to understand the layouts & the widgets that you are planning to use or build. For that, please go through the following critical documentation link on Kivy Layouts. Please go through this if you are doing this for the first time.
To pinpoint the conversation, I would like to present the documentation segment from the official site in the given picture –
Official Kivy-refernce
Since we’ve used our custom buttons & top bars, the most convenient GUI layouts will be FloatLayout for our use case. By using that layout, we can conveniently position our widgets at any random place as per our needs. At the same time, one can use nested layouts by combining different types of arrangements under another.
Some of the key lines from the above scripting files will be –
Screen:
name: "background_1"
Image:
source: "Background/Background_1.png"
allow_stretch: True
keep_ratio: True
size_hint_y: None
size_hint_x: None
width: self.parent.width
height: self.parent.width/self.image_ratio
FloatLayout:
orientation: 'vertical'
Label:
text: "This is an application, which will consume the live streaming data inside a Kivy-based IOS-App by using Matplotlib to capture the KPIs."
text_size: self.width + 350, None
height: self.texture_size[1]
halign: "left"
valign: "bottom"
pos_hint: {'center_x':2.9,'center_y':6.5}
Image:
id: homesc
pos_hint: {'right':6, 'top':5.4}
size_hint: None, None
size: 560, 485
source: "Background/FP.jpeg"
Let us understand what we discussed here & try to map that with the image.
Part of GUI defined in KV file
From the above image now, you can understand how we placed the label & image into our custom positions to create a lean & clean interface.
Let us understand the custom buttons mapped in our Apps.
So, these are custom buttons. We placed them into specific positions & sizes by mentioning the appropriate size & position coordinates & then assigned the button methods (on_press & on_release).
However, these button methods will be present inside the main python script, which we’ll discuss after this segment.
3. main.py (Consuming Streaming data from Ably channels & captured IoT events from the simulator & publish them in Kivy-based iOS App through measured KPIs.)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
We’ve taken one of the button events & captured how the application will behave once someone clicks the Start button & how it will bring all the corresponding elements of a static page. It also explained the transition type between screens.
The above crucial line shows how we capture the live calculation & assign them into matplotlib plots & finally assign that figure canvas of matplotlib to a box widget as per our size & display the change content whenever it invokes this method.
Rests of the functions are pretty self-explanatory. So, I’m not going to discuss them.
Run:
Let’s run the app & see the output –
STEP – 1
Triggering the mock IoT App
STEP – 2
Triggering the iOS App
STEP – 3
Mobile Screen Shots
So, we’ve done it.
You will get the complete codebase in the following Github link.
I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.
Till then, Happy Avenging!
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially all the GUI components size & position that will be dynamic in nature by defining self.width along with some constant values.
Today, I’ll share one of the important posts on predicting data using facebook’s relatively new machine-learning-based API. I find this API is interesting as to how it can build & anticipate the outcome.
We’ll be using one of the most acceptable API-based sources for Covid-19 & I’ll be sharing the link over here.
We’ll be using the prophet-API developed by Facebook to predict the data. You will get the details from this link.
Architecture
Now, let’s explore the architecture shared above.
As you can see that the application will consume the data from the third-party API named “about-corona,” & the python application will clean & transform the data. The application will send the clean data to the Facebook API (prophet) built on the machine-learning algorithm. This API is another effective time-series analysis platform given to the data scientist community.
Once the application receives the predicted model, it will visualize them using plotly & matplotlib.
I would request you to please check the demo of this application just for your reference.
Demo Run
We’ll do a time series analysis. Let us understand the basic concept of time series.
Time series is a series of data points indexed (or listed or graphed) in time order.
Therefore, the data organized by relatively deterministic timestamps and potentially compared with random sample data contain additional information that we can leverage for our business use case to make a better decision.
To use the prophet API, one needs to use & transform their data cleaner & should contain two fields (ds & y).
Let’s check one such use case since our source has plenty of good data points to decide. We’ve daily data of newly infected covid patients based on countries, as shown below –
Covid Cases
And, our clean class will transform the data into two fields –
Transformed Data
Once we fit the data into the prophet model, it will generate some additional columns, which will be used for prediction as shown below –
Generated data from prophet-api
And, a sample prediction based on a similar kind of data would be identical to this –
Sample Prediction
Let us understand what packages we need to install to prepare this application –
Installing Dependency Packages – IInstalling Dependency Packages – II
1. clsConfig.py ( This native Python script contains the configuration entries. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
We’re not going to discuss anything specific to this script.
2. clsL.py ( This native Python script logs the application. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Based on the operating system, the log class will capture potential information under the “log” directory in the form of csv for later reference purposes.
3. clsForecast.py ( This native Python script will clean & transform the data. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
Let’s explore the critical snippet out of this script –
df_Comm = dfWork[[tms, fnc]]
Now, the application will extract only the relevant columns to proceed.
df_Comm.columns = ['ds', 'y']
It is now assigning specific column names, which is a requirement for prophet API.
4. clsCovidAPI.py ( This native Python script will call the Covid-19 API. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
The application will extract the elements & normalize the JSON & convert that to a pandas dataframe & also added one dummy column, which will use for the later purpose to merge the data from another set.
Now, the application will take the nested element & normalize that as per granular level. Also, it added the dummy column to join both of these data together.
The application will Merge both the data sets to get the complete denormalized data for our use cases.
# Merging with the previous Country Code data
if cnt == 0:
df_M = df_fin
else:
d_frames = [df_M, df_fin]
df_M = p.concat(d_frames)
This entire deserializing execution happens per country. Hence, the above snippet will create an individual sub-group based on the country & later does union to all the sets.
If any calls to source API fails, the application will retrigger after waiting for a specific time until it reaches its maximum capacity.
5. callPredictCovidAnalysis.py ( This native Python script is the main one to predict the Covid. )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
The application is extracting the full country name based on ISO country code.
# Lowercase the column names
iDF.columns = [c.lower() for c in iDF.columns]
# Determine which is Y axis
y_col = [c for c in iDF.columns if c.startswith('y')][0]
# Determine which is X axis
x_col = [c for c in iDF.columns if c.startswith('ds')][0]
# Data Conversion
iDF['y'] = iDF[y_col].astype('float')
iDF['ds'] = iDF[x_col].astype('datetime64[ns]')
The above script will convert all the column names in lower letters & then convert & cast them with the appropriate data type.
# Forecast calculations
# Decreasing the changepoint_prior_scale to 0.001 to make the trend less flexible
m = Prophet(n_changepoints=20, yearly_seasonality=True, changepoint_prior_scale=0.001)
m.fit(iDF)
forecastDF = m.make_future_dataframe(periods=365)
forecastDF = m.predict(forecastDF)
l.logr('15.forecastDF_' + var + '_' + countryCD + '.csv', debug_ind, forecastDF, 'log')
df_M = forecastDF[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]
l.logr('16.df_M_' + var + '_' + countryCD + '.csv', debug_ind, df_M, 'log')
The above snippet will use the machine-learning driven prophet-API, where the application will fit the model & then predict based on the existing data for a year. Also, we’ve identified the number of changepoints. By default, the prophet-API adds 25 changepoints to the initial 80% of the data set that trend is less flexible.
Prophet allows you to adjust the trend in case there is an overfit or underfit. changepoint_prior_scale helps adjust the strength of the movement & decreasing the changepoint_prior_scale to 0.001 to make it less flexible.
The application is fetching & creating the country-specific dataframe.
for i in countryList:
try:
cntryIndiv = i.strip()
print('Country Porcessing: ' + str(cntryIndiv))
# Creating dataframe for each country
# Germany Main DataFrame
dfCountry = countrySpecificDF(df, cntryIndiv)
l.logr(str(cnt) + '.df_' + cntryIndiv + '_' + var1 + '.csv', DInd, dfCountry, 'log')
# Let's pass this to our map section
retDFGenNC = x2.forecastNewConfirmed(dfCountry, DInd, var1)
statVal = str(NC)
a1 = plot_picture(retDFGenNC, DInd, var1, cntryIndiv, statVal)
retDFGenNC_D = x2.forecastNewDead(dfCountry, DInd, var1)
statVal = str(ND)
a2 = plot_picture(retDFGenNC_D, DInd, var1, cntryIndiv, statVal)
cntryFullName = countryDet(cntryIndiv)
if (a1 + a2) == 0:
oprMsg = cntryFullName + ' ' + SM
print(oprMsg)
else:
oprMsg = cntryFullName + ' ' + FM
print(oprMsg)
# Resetting the dataframe value for the next iteration
dfCountry = p.DataFrame()
cntryIndiv = ''
oprMsg = ''
cntryFullName = ''
a1 = 0
a2 = 0
statVal = ''
cnt += 1
except Exception as e:
x = str(e)
print(x)
The above snippet will call the function to predict the data & then predict the visual representation based on plotting the data points.
Let us run the application –
Application Run
And, it will generate the visual representation as follows –
Application Run – Continue
And, here is the folder structure –
Directory Structure
Let’s explore the comparison study & try to find out the outcome –
Option – 1
Option – 2
Option – 3
Option -4
Let us analyze from the above visual data-point.
Conclusion:
Let’s explore the comparison study & try to find out the outcome –
India may see a rise of new covid cases & it might cross the mark 400,000 during June 2022 & would be the highest among the countries that we’ve considered here including India, Indonesia, Germany, US, UK, Canada & Brazil. The second worst affected country might be the US during the same period. The third affected country will be Indonesia during the same period.
Canada will be the least affected country during June 2022. The figure should be within 12,000.
However, death case wise India is not only the leading country. The US, India & Brazil will see almost 4000 or slightly over the 4000 marks.
So, we’ve done it.
You will get the complete codebase in the following Github link.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.
One more thing you need to understand is that this prediction based on limited data points. The actual event may happen differently. Ideally, countries are taking a cue from this kind of analysis & are initiating appropriate measures to avoid the high-curve. And, that is one of the main objective of time series analysis.
There is always a room for improvement of this kind of models & the solution associated with it. I’ve shown the basic ways to achieve the same for the education purpose only.
It’s time to share another installment of fun & exciting posts from the world of Python-verse.
Today, We’ll be leveraging the Twilio voice API to send custom messages through phone calls directly. This service is beneficial on many occasions, including alerting the customer of potential payment reminders to pending product delivery calls to warehouse managers.
Dependent Packages:
Let us explore what packages we need for this –
Dependent Package Installation
The commands for your reference –
pip install twilio
pip install pandas
Also, you need to subscribe/register in Twilio. I’ve already shown you what to do about that. You can refer to my old post to know more about it. However, you need to reserve one phone number from which you will be calling your customers.
Buying phone numbers
As you can see, I’ve reserved one phone number to demonstrate this use case.
Let us explore the key codebase –
clsVoiceAPI.py (Main class invoking the voice API)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
We’re invoking the Twilio API in the above block by giving both the calling & Callee numbers. And, we’re receiving the desired messages from our primary calling program, which the IVR will spell while calling to the customers.
2. callTwilioVoice.py (Main calling script)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode characters
# Create the instance of the Twilio Voice API Class
x1 = ca.clsVoiceAPI()
# Let's pass this to our map section
resSID = x1.sendCall(voiceCallText)
As you can see, we’re first instantiating the class & then calling the method from it by providing the appropriate messages that will eventually deliver to our customer. You can configure dynamic content & pass it to this class.
Let us explore the directory structure –
Directory Structures
Let us see how it runs –
Running Applications
You need to make sure that you are checking your balance of your Twilio account diligently.
Checking Balance
And, here is the sneak peak of how it looks like in an video –
Actual execution
For more information on IVR, please check the following link.
Today, I’m going to discuss a potential use case, where on many occasions, different teams need almost similar kinds of data through API. However, they are not identical. Creating a fresh API/Microservice after following-up with many processes will take significant time.
What if we can create an API in such a way so that we can get the response dynamically without needing to make another one. In this post, we’ll be demonstrating a similar approach.
I’ll be using open-source Covid-API, which will be useful for several posts starting from this one.
Let’s take two cases. One, where one service might need to access all the elements, there might be another, where some other service requires specific details.
Let’s explore the code base first –
init.py ( This native Python-based azure-function that will consume streaming data & dynamic API response. )
###########################################
#### Written By: SATYAKI DE ####
#### Written On: 06-Feb-2021 ####
#### Package Flask package needs to ####
#### install in order to run this ####
#### script. ####
#### ####
#### Objective: Main Calling scripts. ####
#### ####
#### However, to meet the functionality####
#### we've enhanced as per our logic. ####
###########################################
import logging
import json
import requests
import os
import pandas as p
import numpy as np
import azure.functions as func
def main(req: func.HttpRequest) -> func.HttpResponse:
logging.info('Dynamic-Covid-Status HTTP trigger function processed a request.')
try:
# Application Variable
url = os.environ['URL']
appType = os.environ['appType']
conType = os.environ['conType']
# API-Configuration
payload={}
headers = {
"Connection": conType,
"Content-Type": appType
}
# Validating input parameters
typeSel = req.params.get('typeSel')
if not typeSel:
try:
req_body = req.get_json()
except ValueError:
pass
else:
typeSel = req_body.get('typeSel')
typeVal = req.params.get('typeVal')
if not typeVal:
try:
req_body = req.get_json()
except ValueError:
pass
else:
typeVal = req_body.get('typeVal')
# Printing Key-Element Values
str1 = 'typeSel: ' + str(typeSel)
logging.info(str1)
str2 = 'typeVal: ' + str(typeVal)
logging.info(str2)
# End of API-Inputs
# Getting Covid data from the REST-API
response = requests.request("GET", url, headers=headers, data=payload)
ResJson = response.text
if typeSel == '*':
if typeVal != '':
# Converting it to Json
jdata = json.loads(ResJson)
df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])
rJson = df_ret.to_json(orient ='records')
return func.HttpResponse(rJson, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for all values!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
elif typeSel == 'Cols':
if typeVal != '':
# Converting it to Json
jdata = json.loads(ResJson)
df_ret = p.io.json.json_normalize(jdata)
df_ret.columns = df_ret.columns.map(lambda x: x.split(".")[-1])
# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []
listX = typeVal.split (",")
for i in listX:
lstHead.append(str(i).strip())
str3 = 'Main List: ' + str(lstHead)
logging.info(str3)
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
rJson = slice_df.to_json(orient ='records')
return func.HttpResponse(rJson, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for selected values!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
else:
x_stat = 'Failed'
x_msg = 'Important information is missing for typeSel!'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
except Exception as e:
x_msg = str(e)
x_stat = 'Failed'
rJson = {
"status": x_stat,
"details": x_msg
}
xval = json.dumps(rJson)
return func.HttpResponse(xval, status_code=200)
In the above lines, we’re converting the response & organizing it to a pandas dataframe before converting the response to JSON.
# Fetching for the selected columns
# Extracting the columns from the list
lstHead = []
listX = typeVal.split (",")
for i in listX:
lstHead.append(str(i).strip())
str3 = 'Main List: ' + str(lstHead)
logging.info(str3)
#slice_df = df_ret[df_ret.columns.intersection(lstHead)]
slice_df = df_ret[np.intersect1d(df_ret.columns, lstHead)]
For the second case, the above additional logic will play a significant part. Based on the supplied input in the typeVal attribute, this time, the new response will display accordingly.
Let’s see how it looks –
Azure function in Visual Studio Code
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">Let's test it using Postman -Let’s test it using Postman –
Case 1 (For all the columns):
For all elements
And, the formatted output is as follows –
Formatted output for all elements
Case 2 (For selected columns):
For selected elements
<p value="<amp-fit-text layout="fixed-height" min-font-size="6" max-font-size="72" height="80">And, the formatted output is as follows -And, the formatted output is as follows –
Formatted output ofSelected element case
You can find the code in the Github using the following link.
So, finally, we have done it.
I’ll bring some more exciting topic in the coming days from the Python verse.
Till then, Happy Avenging! 😀
Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only.
You must be logged in to post a comment.