In this short article we are going to discuss how to analyze face on video stream using Azure Custom Vision, OpenCV and Tensorflow. Feel free to use the code snippets in your applications, I will be completely satisfied if someone finds it useful. Up we go!
When may you need this?
As we've seen in our previous articles, Azure Custom Vision allows creating powerful CV models using transfer learning. The only thing you need to do is to provide a batch of images with the corresponding tags, so nothing complicated. However, when you need to do a precise classification, there may be some tricky manipulations to perform. For instance, you may want to analyze if a person wears glasses. You may also want to know whether the glasses are correctly put. And the most important thing, the analysis should be done in real time. Thus, if the image contains multiple persons, how can you just analyze the persons face and not the background. One solution would be to extract background beforehand, but it is a, kind of, challenging issue, so the easiest way is to do crop, but in intelligent way.
Why on local?
When you do the analysis in real time you need to analyze each frame, each milisecond. Consequently, when you call API, at each iteration you will need to open the URL connection, prepare image, send the HTTP request, get HTTP response, and finally, parse the JSON output. It's not too long in principle, but it is definetely much longer than a milisecond, so you can hardly call it "real-time". We will see how to do it on local, just within a couple of miliseconds.
Enough for text, let's dive into the code
First of all we need to train the model. At this stage there should be no problem. Simply upload your images, tag them and launch the training. Important: before starting your project be sure to make it ‘exportable’, i.e. select compact option.
This step should not be too complicated. After the training ends (usually it’s a matter of few seconds) go to Performance tab and click on Export button. In the dialog menu choose Tensor Flow (Android) and download it. The key word for us is Tensor Flow and not Android, as we are going to use it in our Python application.
This will download 3 files on your computer: model.pb which is the trained model itself, manifest and labels.txt that is the list of your classes.
We are now ready to dive into the implementation. But before, let's do some preparation.
Image processing functions
# coding utf-8
import cv2
import os
import tensorflow as tf
import numpy as np
# BGR colors
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
BLUE = (255, 0, 0)
GREEN = (0, 255, 0)
RED = (0, 0, 255)
# Output text parameters
FONT = cv2.FONT_HERSHEY_SIMPLEX
FONT_SCALE = 1
LINE_TYPE = 1
WINDOW_NAME = 'Glass Classifier'
def resize_down_to_1600_max_dim(image):
"""Change oversized image dimensions using Linear Interpolation
Arguments:
image {OpenCV} -- OpenCV image
Returns:
OpenCV -- resized or initial image
"""
h, w = image.shape[:2]
if (h < 1600 and w < 1600):
return image
new_size = (1600 * w // h, 1600) if (h > w) else (1600, 1600 * h // w)
return cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)
def crop_center(img, cropx, cropy):
"""Extract a middle part of an image
Arguments:
img {OpenCv} -- OpenCV image to be cropped
cropx {[type]} -- width of the cropped region
cropy {[type]} -- height of the cropped region
Returns:
[OpenCV] -- cropped image
"""
h, w = img.shape[:2]
startx = w//2-(cropx//2)
starty = h//2-(cropy//2)
return img[starty:starty+cropy, startx:startx+cropx]
def resize_to_256_square(image):
"""Resize an image using the Linear Interpolation
Arguments:
image {OpenCV} -- OpenCV image
Returns:
OpenCV -- resized image
"""
h, w = image.shape[:2]
return cv2.resize(image, (256, 256), interpolation=cv2.INTER_LINEAR)
def save_image(image, folder):
"""Save an image with unique name
Arguments:
image {OpanCV} -- image object to be saved
folder {string} -- output folder
"""
# check whether the folder exists and create one if not
if not os.path.exists(folder):
os.makedirs(folder)
# to not erase previously saved photos counter (image name) = number of photos in a folder + 1
image_counter = len([name for name in os.listdir(folder)
if os.path.isfile(os.path.join(folder, name))])
# increment image counter
image_counter += 1
# save image to the dedicated folder (folder name = label)
cv2.imwrite(folder + '/' + str(image_counter) + '.png', image)
As you have noticed here, I added my custom image saver, this will help me in retraining the image whenever I need to.
Load the model
# graph of operations to upload trained model
graph_def = tf.compat.v1.GraphDef()
# list of classes
labels = ['without_glass', 'with_glass']
# N.B. Azure Custom vision allows export trained model in the form of 2 files
# model.pb: a tensor flow graph and labels.txt: a list of classes
# import tensor flow graph, r+b mode is open the binary file in read or write mode
with tf.io.gfile.GFile(name='glass_model.pb', mode='rb') as f:
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def=graph_def, name='')
Prepare video stream
# initialize video capture object to read video from external webcam
video_capture = cv2.VideoCapture(1)
# if there is no external camera then take the built-in camera
if not video_capture.read()[0]:
video_capture = cv2.VideoCapture(0)
# Full screen mode
cv2.namedWindow(WINDOW_NAME, cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty(
WINDOW_NAME, cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
# These names are part of the model and cannot be changed.
output_layer = 'loss:0'
input_node = 'Placeholder:0'
predicted_tag = 'Predicted Tag'
# counter to control the percentage of saved images
frame_counter = 0
And here's very important thing. We start the tensorflow session before the main loop. Thus, you initialize your session once, and it will save you about 1 sec, which is considerably important for real-time processing.
with tf.compat.v1.Session() as sess:
prob_tensor = sess.graph.get_tensor_by_name(output_layer)
while(video_capture.isOpened()):
# read video frame by frame
ret, frame = video_capture.read()
try:
frame = cv2.flip(frame, 1)
frame_counter += 1
# frame width and height
w, h = 200, 300
# set upper and lower boundaries
upX = 220
upY = 50
lowX = upX + w
lowY = upY + h
image = frame[upY:lowY, upX:lowX]
# If the image has either w or h greater than 1600 we resize it down respecting
# aspect ratio such that the largest dimension is 1600
image = resize_down_to_1600_max_dim(image)
# We next get the largest center square
h, w = image.shape[:2]
min_dim = min(w, h)
max_square_image = crop_center(image, min_dim, min_dim)
# Resize that square down to 256x256
augmented_image = resize_to_256_square(image)
# Get the input size of the model
input_tensor_shape = sess.graph.get_tensor_by_name(
input_node).shape.as_list()
network_input_size = input_tensor_shape[1]
# Crop the center for the specified network_input_Size
augmented_image = cv2.resize(
image, (network_input_size, network_input_size), interpolation=cv2.INTER_LINEAR)
predictions = sess.run(
prob_tensor, {input_node: [augmented_image]})
# get the highest probability label
highest_probability_index = np.argmax(predictions)
predicted_tag = labels[highest_probability_index]
output_text = predicted_tag
if predicted_tag == 'ok':
frameColor = GREEN
elif predicted_tag == 'ko':
frameColor = RED
else:
frameColor = RED
cv2.rectangle(frame, (upX, upY), (lowX, lowY), frameColor, 1)
if (frame_counter % 10 == 0):
save_image(augmented_image, predicted_tag)
except:
continue
cv2.imshow(WINDOW_NAME, frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# release video capture object
video_capture.release()
cv2.destroyAllWindows()
So we crop the region we need to analyze and apply image processing functions on it
You may have noticed that I've also added a frame counter to limit the number of images saved for training, in other words, I save only 10 % of the whole flow. But you may have also noticed that we defined the face region manually, which is not the best solution, so what we'll do right now is add some intelligence using OpenCV dnn and a pretrained caffe model.
# Caffe 'deploy' prototxt file
prototxt = "models//deploy.prototxt.txt"
# Caffe pretrained model
model = "models//res10_300x300_ssd_iter_140000.caffemodel"
# minimum probability to filter weak detections
min_confidence = 0.5
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)
There is an excellent tutorial explaining how to do deep learning-based face recognition provided by Adrian Rosebrock at PyImageSearch, so please refer to his blog for any additional information.
Same steps, but using Face Detection
# grab the frame dimensions and convert it to a blob
(h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0,
(300, 300), (104.0, 177.0, 123.0))
# pass the blob through the network and obtain the detections and
# predictions
net.setInput(blob)
detections = net.forward()
faces_counter = 0
# loop over the detections
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with the
# prediction
confidence = detections[0, 0, i, 2]
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence < min_confidence:
continue
# compute the (x, y)-coordinates of the bounding box for the
# object
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
face_frame = frame[startY:endY, startX:endX]
# get face height
face_height = endY - startY
# set logo size
# logo always represents 20% of the rectangle
LOGO_SIZE = int(face_height*0.2)
# resize logo according to the image height
white_mask_compact = image_resize(
white_mask, width=LOGO_SIZE, height=LOGO_SIZE)
orange_mask_compact = image_resize(
orange_mask, width=LOGO_SIZE, height=LOGO_SIZE)
green_mask_compact = image_resize(
green_mask, width=LOGO_SIZE, height=LOGO_SIZE)
# calculate upper and lower corners of the rectangle
# needed to place logos correctly
rectangleXup = startX
rectangleYup = startY
rectangleXdown = endX
rectangleYdown = endY
# distance between border and logo
PADDING = 5
# mask
rightX = rectangleXdown - LOGO_SIZE - PADDING
rightY = rectangleYdown + PADDING
# gowning
leftX = rectangleXup + PADDING
leftY = rectangleYdown + PADDING
maskX = int((leftX + rightX)/2)
maskY = rectangleYdown + PADDING
face_frame = resize_to_256_square(face_frame)
# Crop the center for the specified network_input_Size
augmented_image = crop_center(
face_frame, network_input_size, network_input_size)
predictions, = sess.run(
prob_tensor, {input_node: [augmented_image]})
highest_probability_index = np.argmax(predictions)
rectangle_color = WHITE
if labels[highest_probability_index] == "with_glass":
rectangle_color = GREEN
status_logo = white_mask_compact
predicted_value = "ok"
elif labels[highest_probability_index] == "without_glass":
rectangle_color = RED
status_logo = orange_mask_compact
predicted_value = "ko"
cv2.rectangle(frame, (startX, startY), (endX, endY),
rectangle_color, 2)
cv2.rectangle(frame, (rectangleXup - 1, rectangleYdown), (rectangleXdown +
1, rectangleYdown + LOGO_SIZE + PADDING), rectangle_color, -1)
frame[maskY:maskY + LOGO_SIZE, maskX:maskX +
LOGO_SIZE] = status_logo
faces_counter += 1
Here's the full code.
As you see, in the final version I've used imutils library, because I wanted to keep image ratio when using the full-screen mode. I also added some icones to make it more beautiful, so you may create a folder in the root directory and put your own images.
# import the necessary packages
from imutils.video import VideoStream
import numpy as np
import argparse
import imutils
import time
import cv2
import os
import tensorflow as tf
from win32api import GetSystemMetrics
import pyodbc
print("Width =", GetSystemMetrics(0))
print("Height =", GetSystemMetrics(1))
width = GetSystemMetrics(0)
height = GetSystemMetrics(1)
# BGR color constants
WHITE = (255, 255, 255)
BLUE = (255, 0, 0)
GREEN = (0, 255, 0)
RED = (0, 0, 255)
BLACK = (0, 0, 0)
def image_resize(image, width=None, height=None, inter=cv2.INTER_AREA):
# resize without distortion
# initialize the dimensions of the image to be resized and
# grab the image size
dim = None
(h, w) = image.shape[:2]
# if both the width and height are None, then return the
# original image
if width is None and height is None:
return image
# check to see if the width is None
if width is None:
# calculate the ratio of the height and construct the
# dimensions
r = height / float(h)
dim = (int(w * r), height)
# otherwise, the height is None
else:
# calculate the ratio of the width and construct the
# dimensions
r = width / float(w)
dim = (width, int(h * r))
# resize the image
resized = cv2.resize(image, dim, interpolation=inter)
# return the resized image
return resized
def convert_to_opencv(image):
# RGB -> BGR conversion is performed as well.
image = image.convert('RGB')
r, g, b = np.array(image).T
opencv_image = np.array([b, g, r]).transpose()
return opencv_image
def crop_center(img, cropx, cropy):
h, w = img.shape[:2]
startx = w//2-(cropx//2)
starty = h//2-(cropy//2)
return img[starty:starty+cropy, startx:startx+cropx]
def resize_down_to_1600_max_dim(image):
h, w = image.shape[:2]
if (h < 1600 and w < 1600):
return image
new_size = (1600 * w // h, 1600) if (h > w) else (1600, 1600 * h // w)
return cv2.resize(image, new_size, interpolation=cv2.INTER_LINEAR)
def resize_to_256_square(image):
h, w = image.shape[:2]
try:
resized_image = cv2.resize(
image, (256, 256), interpolation=cv2.INTER_LINEAR)
except:
resized_image = image
return resized_image
def save_image(image, folder):
"""Save an image with unique name
Arguments:
image {OpanCV} -- image object to be saved
folder {string} -- output folder
"""
# check whether the folder exists and create one if not
if not os.path.exists(folder):
os.makedirs(folder)
# to not erase previously saved photos counter (image name) = number of photos in a folder + 1
image_counter = len([name for name in os.listdir(folder)
if os.path.isfile(os.path.join(folder, name))])
# increment image counter
image_counter += 1
# save image to the dedicated folder (folder name = label)
cv2.imwrite(folder + '/' + str(image_counter) + '.png', image)
# Caffe 'deploy' prototxt file
prototxt = "models//deploy.prototxt.txt"
# Caffe pretrained model
model = "models//res10_300x300_ssd_iter_140000.caffemodel"
# minimum probability to filter weak detections
min_confidence = 0.5
# load our serialized model from disk
print("[INFO] loading model...")
net = cv2.dnn.readNetFromCaffe(prototxt, model)
# initialize the video stream and allow the cammera sensor to warmup
print("[INFO] starting video stream...")
vs = VideoStream(src=1).start()
if not isinstance(vs.read(), np.ndarray):
vs = VideoStream(src=0).start()
time.sleep(1.0)
graph_def = tf.compat.v1.GraphDef()
labels = ["with_glass", "without_glass"]
# These are set to the default names from exported models, update as needed.
filename = "models//mask_model.pb"
# Import the TF graph
with tf.io.gfile.GFile(filename, 'rb') as f:
graph_def.ParseFromString(f.read())
tf.import_graph_def(graph_def, name='')
# Get the input size of the model
with tf.compat.v1.Session() as sess:
input_tensor_shape = sess.graph.get_tensor_by_name(
'Placeholder:0').shape.as_list()
network_input_size = input_tensor_shape[1]
# Graphical elements
# mask
white_mask = cv2.imread('icones/picto_blanc/picto1.png')
orange_mask = cv2.imread('icones/picto_orange/picto1.png')
green_mask = cv2.imread('icones/picto_vert/picto1.png')
header_logo = cv2.imread('icones/header_logo.png')
logo_height, logo_width, _ = header_logo.shape
header_logo_resized = image_resize(
header_logo, width=int(logo_height*2), height=int(logo_width*4))
with tf.compat.v1.Session() as sess:
# These names are part of the model and cannot be changed.
output_layer = 'loss:0'
input_node = 'Placeholder:0'
prob_tensor = sess.graph.get_tensor_by_name(output_layer)
# loop over the frames from the video stream
while True:
try:
# grab the frame from the threaded video stream and resize it
# to have a maximum width of 400 pixels
frame = vs.read()
frame = imutils.resize(frame, width=width)
frame[0:header_logo_resized.shape[0],
0:header_logo_resized.shape[1]] = header_logo_resized
# grab the frame dimensions and convert it to a blob
(h, w) = frame.shape[:2]
blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0,
(300, 300), (104.0, 177.0, 123.0))
# pass the blob through the network and obtain the detections and
# predictions
net.setInput(blob)
detections = net.forward()
faces_counter = 0
# loop over the detections
for i in range(0, detections.shape[2]):
# extract the confidence (i.e., probability) associated with the
# prediction
confidence = detections[0, 0, i, 2]
# filter out weak detections by ensuring the `confidence` is
# greater than the minimum confidence
if confidence < min_confidence:
continue
# compute the (x, y)-coordinates of the bounding box for the
# object
box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
(startX, startY, endX, endY) = box.astype("int")
face_frame = frame[startY:endY, startX:endX]
# get face height
face_height = endY - startY
# set logo size
# logo always represents 20% of the rectangle
LOGO_SIZE = int(face_height*0.2)
# resize logo according to the image height
white_mask_compact = image_resize(
white_mask, width=LOGO_SIZE, height=LOGO_SIZE)
orange_mask_compact = image_resize(
orange_mask, width=LOGO_SIZE, height=LOGO_SIZE)
green_mask_compact = image_resize(
green_mask, width=LOGO_SIZE, height=LOGO_SIZE)
# calculate upper and lower corners of the rectangle
# needed to place logos correctly
rectangleXup = startX
rectangleYup = startY
rectangleXdown = endX
rectangleYdown = endY
# distance between border and logo
PADDING = 5
# mask
rightX = rectangleXdown - LOGO_SIZE - PADDING
rightY = rectangleYdown + PADDING
# gowning
leftX = rectangleXup + PADDING
leftY = rectangleYdown + PADDING
maskX = int((leftX + rightX)/2)
maskY = rectangleYdown + PADDING
face_frame = resize_to_256_square(face_frame)
# Crop the center for the specified network_input_Size
augmented_image = crop_center(
face_frame, network_input_size, network_input_size)
predictions, = sess.run(
prob_tensor, {input_node: [augmented_image]})
highest_probability_index = np.argmax(predictions)
rectangle_color = WHITE
if labels[highest_probability_index] == "with_glass":
rectangle_color = GREEN
status_logo = white_mask_compact
predicted_value = "ok"
elif labels[highest_probability_index] == "without_glass":
rectangle_color = RED
status_logo = orange_mask_compact
predicted_value = "ko"
cv2.rectangle(frame, (startX, startY), (endX, endY),
rectangle_color, 2)
cv2.rectangle(frame, (rectangleXup - 1, rectangleYdown), (rectangleXdown +
1, rectangleYdown + LOGO_SIZE + PADDING), rectangle_color, -1)
frame[maskY:maskY + LOGO_SIZE, maskX:maskX +
LOGO_SIZE] = status_logo
faces_counter += 1
# font
font = cv2.FONT_HERSHEY_SIMPLEX
# org
org = (200, 200)
# fontScale
fontScale = 1
# Line thickness of 2 px
thickness = 2
frame = cv2.putText(frame, str(faces_counter),org, font,
fontScale, BLUE, thickness, cv2.LINE_AA)
# show the output frame
cv2.imshow("Expertime", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key was pressed, break from the loop
if key == ord("q"):
break
except ValueError:
print("out of camera range")
Hope you found it useful. Any questions, please do not hesitate to leave a comment.
Wish you all an excellent week!
Kommentare