Restoring old video’s with python-based application

Hi Guys!

Today, I’ll be demonstrating a primary way to improve the quality of old video using the Open-CV package. This post is the first of such a series of Open-CV that I’ll be posting in the coming years.

Let me tell you one thing – there are many brilliant papers on this, especially image enhancement with OpenCV, Pillow & many more valuable libraries. I’ll share some of the fascinating links later at the end of my blog.


What are we planning here?

We’ll de-noise the old video.
Slightly bright the video.

What kind of video should be the ideal candidate for this test?

Any video with more noise with low light will be an ideal candidate for this use case.


Why don’t we see the demo?

Demo

Architecture:

Let us find the basic architecture –

Flow of executions

Code:

Let us explore the the key code base as follows –

  1. clsVideo2Frame.py (This will convert the supplied video into multiple frames. It will also extract the audio from the source file, which will later merge with the enhanced frames.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: Consuming Streaming data ####
#### from Ably channels & captured IoT ####
#### events from the simulator & publish ####
#### them in Kivy-I/OS App through ####
#### measured KPIs. ####
#### ####
##############################################
import av
import os
import platform as pl
import subprocess
import sys
from clsConfig import clsConfig as cf
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsVideo2Frame:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
try:
"""Converts video to audio directly using `ffmpeg` command
with the help of subprocess module"""
filename, ext = os.path.splitext(video_file)
subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1
def genFrame(self, dInd, var):
try:
base_path = self.base_path
fileNm = self.fileNm
path_to_src_video = base_path + sep + 'Source' + sep + fileNm + '.mp4'
temp_path = base_path + sep + 'Temp' + sep
print('Path: ', path_to_src_video)
x = self.convert_video_to_audio_ffmpeg(path_to_src_video)
if x == 0:
print('Successfully Audio extracted from the source file!')
else:
print('Failed to extract the source audio!')
container = av.open(path_to_src_video)
for frame in container.decode(video=0):
frame.to_image().save(temp_path + 'frame-%04d.jpg' % frame.index)
print('Successfully Converted to Frames!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand some of the key snippet below –

    def convert_video_to_audio_ffmpeg(self, video_file, output_ext="mp3"):
        try:
            """Converts video to audio directly using `ffmpeg` command
            with the help of subprocess module"""
            filename, ext = os.path.splitext(video_file)
            subprocess.call(["ffmpeg", "-y", "-i", video_file, f"{filename}.{output_ext}"],
                            stdout=subprocess.DEVNULL,
                            stderr=subprocess.STDOUT)

            return 0
        except Exception as e:
            x = str(e)
            print('Error: ', x)

            return 1

In the above step, using the FFmpeg package python application is extracting the source audio & storing that into the source directory itself.

for frame in container.decode(video=0):
    frame.to_image().save(temp_path + 'frame-%04d.jpg' % frame.index)

From the above snippet, we can say that the application is splitting the videos into multiple frames & storing them into the temp directory, which will require later enhancement by another class.

2. clsFrameEnhance.py (This will enhance the frames as per your logic & upscale them with the parameters provided by you.)


##############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 12-Nov-2021 ####
#### ####
#### Objective: This python script will ####
#### enhance the old existing frame by ####
#### applying machine-learning algorithm ####
#### to improve their quality one at a ####
#### time. ####
#### ####
##############################################
import av
import os
import platform as pl
import numpy as np
import cv2
import glob
from PIL import Image
from numpy import asarray
import numpy as np
from clsConfig import clsConfig as cf
import sys
# Global Variable
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsFrameEnhance:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def show(self, enhanced_path, fileNameOnly, buff):
cv2.imwrite(enhanced_path + fileNameOnly, buff)
def unsharp_mask(self, image, kernel_size=(3, 3), sigma=1.0, amount=2.0, threshold=2):
"""Return a sharpened version of the image, using an unsharp mask."""
blurred = cv2.GaussianBlur(image, kernel_size, sigma)
sharpened = float(amount + 1) * image float(amount) * blurred
sharpened = np.maximum(sharpened, np.zeros(sharpened.shape))
sharpened = np.minimum(sharpened, 255 * np.ones(sharpened.shape))
sharpened = sharpened.round().astype(np.uint8)
if threshold > 0:
low_contrast_mask = np.absolute(image blurred) < threshold
np.copyto(sharpened, image, where=low_contrast_mask)
return sharpened
def doEnhance(self, dInd, var):
try:
base_path = self.base_path
temp_path = base_path + sep + 'Temp' + sep
enhanced_path = base_path + sep + 'Enhanced' + sep
for filename in sorted(glob.glob(temp_path + '*.jpg')):
print('Full File Name: ', str(filename))
img = cv2.imread(filename)
if img is None:
print('Failed to load image file:', filename)
sys.exit(1)
sharpened_image = self.unsharp_mask(img)
img = np.asarray(sharpened_image)
dst = cv2.fastNlMeansDenoising(img,None,7,7,21)
Inten_matrix = np.ones(dst.shape, dtype='uint8')*20
bright_img = cv2.add(dst, Inten_matrix)
head, tail = os.path.split(filename)
self.show(enhanced_path, tail, bright_img)
# Remove Files
os.remove(filename)
print('Successfully Enhanced the Frames!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let us understand some of the key snippet below –

    def unsharp_mask(self, image, kernel_size=(3, 3), sigma=1.0, amount=2.0, threshold=2):
        """Return a sharpened version of the image, using an unsharp mask."""
        blurred = cv2.GaussianBlur(image, kernel_size, sigma)
        sharpened = float(amount + 1) * image - float(amount) * blurred
        sharpened = np.maximum(sharpened, np.zeros(sharpened.shape))
        sharpened = np.minimum(sharpened, 255 * np.ones(sharpened.shape))
        sharpened = sharpened.round().astype(np.uint8)
        if threshold > 0:
            low_contrast_mask = np.absolute(image - blurred) < threshold
            np.copyto(sharpened, image, where=low_contrast_mask)
        return sharpened

This will provide the sharpen version of the image. If you want to know more about this. Please refer the following link.

img = np.asarray(sharpened_image)

dst = cv2.fastNlMeansDenoising(img,None,7,7,21)

Inten_matrix = np.ones(dst.shape, dtype='uint8')*20
bright_img = cv2.add(dst, Inten_matrix)

As you can see, the image has further enhanced with the use of de-noise & the addition of brightest pixels using Inten_matrix.

3. clsFrame2Video.py (This will combine the frames along with the audio & produce the final video.)


###############################################
#### Updated By: SATYAKI DE ####
#### Updated On: 17-Dec-2021 ####
#### ####
#### Objective: This script will convert ####
#### enhanced frames to restored better ####
#### quality videos & merge it with source ####
#### audio. ####
#### ####
###############################################
import os
import platform as pl
import cv2
import numpy as np
import glob
import re
import ffmpeg
from clsConfig import clsConfig as cf
import logging
os_det = pl.system()
if os_det == "Windows":
sep = '\\'
else:
sep = '/'
class clsFrame2Video:
def __init__(self):
self.fileNm = str(cf.conf['FILE_NAME'])
self.base_path = str(cf.conf['INIT_PATH'])
def convert2Vid(self, dInd, var):
try:
img_array = []
fileNm = self.fileNm
base_path = self.base_path
enhanced_path = base_path + sep + 'Enhanced' + sep
target_path = base_path + sep + 'Target' + sep
path_to_src_audio = base_path + sep + 'Source' + sep + fileNm + '.mp3'
files = glob.glob(enhanced_path + '*.jpg')
for filename in sorted(files, key=lambda x:float(re.findall("(-\d+)",x)[0].replace('-',''))):
print('Processing… ', str(filename))
img = cv2.imread(filename)
height, width, layers = img.shape
size = (width,height)
img_array.append(img)
# Deleting Frames
os.remove(filename)
print('Successfully Removed Old Enhanced Frames!')
out = cv2.VideoWriter(target_path + 'Temp.avi',cv2.VideoWriter_fourcc(*'DIVX'), 23, size)
for i in range(len(img_array)):
out.write(img_array[i])
out.release()
print('Temporary File generated!')
Temp_Target_File = str(target_path + 'Temp.avi')
print('Temporary Video File Name: ', Temp_Target_File)
print('Temporary Audio File Name: ', str(path_to_src_audio))
infile1 = ffmpeg.input(Temp_Target_File)
infile2 = ffmpeg.input(path_to_src_audio)
ffmpeg.concat(infile1, infile2, v=1, a=1).output(target_path + fileNm + '.mp4').run()
# Deleting Frames
os.remove(Temp_Target_File)
print('Successfully Converted to Videos!')
return 0
except Exception as e:
x = str(e)
print('Error: ', x)
return 1

Let’s explore the key snippets –

files = glob.glob(enhanced_path + '*.jpg')

for filename in sorted(files, key=lambda x:float(re.findall("(-\d+)",x)[0].replace('-',''))):
    print('Processing... ', str(filename))
    img = cv2.imread(filename)
    height, width, layers = img.shape
    size = (width,height)
    img_array.append(img)

    # Deleting Frames
    os.remove(filename)

print('Successfully Removed Old Enhanced Frames!')

In the above snippet, the application first stitched frames together to form a temporary video without the audio.

Temp_Target_File = str(target_path + 'Temp.avi')
print('Temporary Video File Name: ', Temp_Target_File)
print('Temporary Audio File Name: ', str(path_to_src_audio))

infile1 = ffmpeg.input(Temp_Target_File)
infile2 = ffmpeg.input(path_to_src_audio)

ffmpeg.concat(infile1, infile2, v=1, a=1).output(target_path + fileNm + '.mp4').run()


out = cv2.VideoWriter(target_path + 'Temp.avi',cv2.VideoWriter_fourcc(*'DIVX'), 23, size)

Finally, merge the audio with the video to produce the final enriched video.

4. restoreOldVideo.py (This is the main application, which will invoke all the python class.)


###############################################
#### Written By: SATYAKI DE ####
#### Written On: 17-Dec-2021 ####
#### Modified On 17-Dec-2021 ####
#### ####
#### Objective: This python script will ####
#### convert the old B&W video & restore ####
#### them to relatively better quality. ####
###############################################
# We keep the setup code in a different class as shown below.
import clsVideo2Frame as vf
import clsFrameEnhance as fe
import clsFrame2Video as fv
from clsConfig import clsConfig as cf
import datetime
import logging
###############################################
### Global Section ###
###############################################
# Instantiating all the three classes
x1 = vf.clsVideo2Frame()
x2 = fe.clsFrameEnhance()
x3 = fv.clsFrame2Video()
###############################################
### End of Global Section ###
###############################################
def main():
try:
# Other useful variables
debugInd = 'Y'
var = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
var1 = datetime.datetime.now()
print('Start Time: ', str(var))
# End of useful variables
# Initiating Log Class
general_log_path = str(cf.conf['LOG_PATH'])
# Enabling Logging Info
logging.basicConfig(filename=general_log_path + 'restoreVideo.log', level=logging.INFO)
print('Started Transformation!')
# Execute all the pass
r1 = x1.genFrame(debugInd, var)
r2 = x2.doEnhance(debugInd, var)
r3 = x3.convert2Vid(debugInd, var)
if ((r1 == 0) and (r2 == 0) and (r3 == 0)):
print('Successfully File Enhanced!')
else:
print('Failed to enhance the source file!')
var2 = datetime.datetime.now()
c = var2 var1
minutes = c.total_seconds() / 60
print('Total difference in minutes: ', str(minutes))
print('End Time: ', str(var1))
except Exception as e:
x = str(e)
print('Error: ', x)
if __name__ == "__main__":
main()

Let us understand the final script –

x1 = vf.clsVideo2Frame()
x2 = fe.clsFrameEnhance()
x3 = fv.clsFrame2Video()

The above lines will instantiate the main python class.

# Execute all the pass
r1 = x1.genFrame(debugInd, var)
r2 = x2.doEnhance(debugInd, var)
r3 = x3.convert2Vid(debugInd, var)

Invoking all the functions with parameters to perform the video upscale.


So, we’ve done it.

You will get the complete codebase in the following Github link.

I’ll bring some more exciting topic in the coming days from the Python verse. Please share & subscribe my post & let me know your feedback.

Till then, Happy Avenging!


Note: All the data & scenario posted here are representational data & scenarios & available over the internet & for educational purpose only. Some of the images (except my photo) that we’ve used are available over the net. We don’t claim the ownership of these images. There is an always room for improvement & especially the quality of video. At this moment, the enhancement class working on a serial manner. You can implement threading or multiprocessing to make it more faster.

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s