
Provisioning of the project code
We start scripting our project in the file tensorflow_detection.py by loading the necessary packages:
import os
import numpy as np
import tensorflow as tf
import six.moves.urllib as urllib
import tarfile
from PIL import Image
from tqdm import tqdm
from time import gmtime, strftime
import json
import cv2
In order to be able to process videos, apart from OpenCV 3, we also need the moviepy package. The package moviepy is a project that can be found at http://zulko.github.io/moviepy/ and freely used since it is distributed with an MIT license. As described on its home page, moviepy is a tool for video editing (that is cuts, concatenations, title insertions), video compositing (non-linear editing), video processing, or to create advanced effects.
The package operates with the most common video formats, including the GIF format. It needs the FFmpeg converter (https://www.ffmpeg.org/) in order to properly operate, therefore at its first usage it will fail to start and will download FFmpeg as a plugin using imageio:
try:
from moviepy.editor import VideoFileClip
except:
# If FFmpeg (https://www.ffmpeg.org/) is not found
# on the computer, it will be downloaded from Internet
# (an Internet connect is needed)
import imageio
imageio.plugins.ffmpeg.download()
from moviepy.editor import VideoFileClip
Finally, we require two useful functions available in the object_detection directory from the TensorFlow API project:
from object_detection.utils import label_map_util
from object_detection.utils import visualization_utils as vis_util
We define the DetectionObj class and its init procedure. The initialization expects only a parameter and the model name (which is initially set to the less well performing, but faster and more lightweight model, the SSD MobileNet), but a few internal parameters can be changed to suit your use of the class:
- self.TARGET_PATH pointing out the directory where you want the processed annotations to be saved.
- self.THRESHOLD fixing the probability threshold to be noticed by the annotation process. In fact, any model of the suit will output many low probability detections in every image. Objects with too low probabilities are usually false alarms, for such reasons you fix a threshold and ignore such highly unlikely detection. As a rule of thumb, 0.25 is a good threshold in order to spot uncertain objects due to almost total occlusion or visual clutter.
class DetectionObj(object):
"""
DetectionObj is a class suitable to leverage
Google Tensorflow detection API for image annotation from
different sources: files, images acquired by own's webcam,
videos.
"""
def __init__(self, model='ssd_mobilenet_v1_coco_11_06_2017'):
"""
The instructions to be run when the class is instantiated
"""
# Path where the Python script is being run
self.CURRENT_PATH = os.getcwd()
# Path where to save the annotations (it can be modified)
self.TARGET_PATH = self.CURRENT_PATH
# Selection of pre-trained detection models
# from the Tensorflow Model Zoo
self.MODELS = ["ssd_mobilenet_v1_coco_11_06_2017",
"ssd_inception_v2_coco_11_06_2017",
"rfcn_resnet101_coco_11_06_2017",
"faster_rcnn_resnet101_coco_11_06_2017",
"faster_rcnn_inception_resnet_v2_atrous_\
coco_11_06_2017"]
# Setting a threshold for detecting an object by the models
self.THRESHOLD = 0.25 # Most used threshold in practice
# Checking if the desired pre-trained detection model is available
if model in self.MODELS:
self.MODEL_NAME = model
else:
# Otherwise revert to a default model
print("Model not available, reverted to default", self.MODELS[0])
self.MODEL_NAME = self.MODELS[0]
# The file name of the Tensorflow frozen model
self.CKPT_FILE = os.path.join(self.CURRENT_PATH, 'object_detection',
self.MODEL_NAME,
'frozen_inference_graph.pb')
# Attempting loading the detection model,
# if not available on disk, it will be
# downloaded from Internet
# (an Internet connection is required)
try:
self.DETECTION_GRAPH = self.load_frozen_model()
except:
print ('Couldn\'t find', self.MODEL_NAME)
self.download_frozen_model()
self.DETECTION_GRAPH = self.load_frozen_model()
# Loading the labels of the classes recognized by the detection model
self.NUM_CLASSES = 90
path_to_labels = os.path.join(self.CURRENT_PATH,
'object_detection', 'data',
'mscoco_label_map.pbtxt')
label_mapping = \
label_map_util.load_labelmap(path_to_labels)
extracted_categories = \
label_map_util.convert_label_map_to_categories(
label_mapping, max_num_classes=self.NUM_CLASSES,
use_display_name=True)
self.LABELS = {item['id']: item['name'] \
for item in extracted_categories}
self.CATEGORY_INDEX = label_map_util.create_category_index\
(extracted_categories)
# Starting the tensorflow session
self.TF_SESSION = tf.Session(graph=self.DETECTION_GRAPH)
As a convenient variable to have access to, you have the self.LABELS containing a dictionary relating a class numerical code to its textual representation. Moreover, the init procedure will have the TensorFlow session loaded, open, and ready to be used at self.TF_SESSION.
The functions load_frozen_model and download_frozen_model will help the init procedure to load the chosen frozen model from disk and, if not available, will help to download it as a TAR file from the internet and unzip it in the proper directory (which is object_detection):
def load_frozen_model(self):
"""
Loading frozen detection model in ckpt
file from disk to memory
"""
detection_graph = tf.Graph()
with detection_graph.as_default():
od_graph_def = tf.GraphDef()
with tf.gfile.GFile(self.CKPT_FILE, 'rb') as fid:
serialized_graph = fid.read()
od_graph_def.ParseFromString(serialized_graph)
tf.import_graph_def(od_graph_def, name='')
return detection_graph
The function download_frozen_model leverages the tqdm package in order to visualize its progress as it downloads the new models from the internet. Some models are quite large (over 600 MB) and it may take a long time. Providing visual feedback on the progress and estimated time of completion will allow the user to be more confident about the progression of the operations:
def download_frozen_model(self):
"""
Downloading frozen detection model from Internet
when not available on disk
"""
def my_hook(t):
"""
Wrapping tqdm instance in order to monitor URLopener
"""
last_b = [0]
def inner(b=1, bsize=1, tsize=None):
if tsize is not None:
t.total = tsize
t.update((b - last_b[0]) * bsize)
last_b[0] = b
return inner
# Opening the url where to find the model
model_filename = self.MODEL_NAME + '.tar.gz'
download_url = \
'http://download.tensorflow.org/models/object_detection/'
opener = urllib.request.URLopener()
# Downloading the model with tqdm estimations of completion
print('Downloading ...')
with tqdm() as t:
opener.retrieve(download_url + model_filename,
model_filename, reporthook=my_hook(t))
# Extracting the model from the downloaded tar file
print ('Extracting ...')
tar_file = tarfile.open(model_filename)
for file in tar_file.getmembers():
file_name = os.path.basename(file.name)
if 'frozen_inference_graph.pb' in file_name:
tar_file.extract(file,
os.path.join(self.CURRENT_PATH,
'object_detection'))
The following two functions, load_image_from_disk and load_image_into_numpy_array, are necessary in order to pick an image from disk and transform it into a Numpy array suitable for being processed by any of the TensorFlow models available in this project:
def load_image_from_disk(self, image_path):
return Image.open(image_path)
def load_image_into_numpy_array(self, image):
try:
(im_width, im_height) = image.size
return np.array(image.getdata()).reshape(
(im_height, im_width, 3)).astype(np.uint8)
except:
# If the previous procedure fails, we expect the
# image is already a Numpy ndarray
return image
The detect function, instead, is the core of the classification functionality of the class. The function just expects lists of images to be processed. A Boolean flag, annotate_on_image, just tells the script to visualize the bounding box and the annotation directly on the provided images.
Such a function is able to process images of different sizes, one after the other, but it necessitates processing each one singularly. Therefore, it takes each image and expands the dimension of the array, adding a further dimension. This is necessary because the model expects an array of size: number of images * height * width * depth.
Note, we could pack all the batch images to be predicted into a single matrix. That would work fine, and it would be faster if all the images were of the same height and width, which is an assumption that our project does not make, hence the single image processing.
We then take a few tensors in the model by name (detection_boxes, detection_scores, detection_classes, num_detections), which are exactly the outputs we expect from the model, and we feed everything to the input tensor, image_tensor, which will normalize the image in a suitable form for the layers of the model to process.
The results are gathered into a list and the images are processed with the detection boxes and represented if required:
def detect(self, images, annotate_on_image=True):
"""
Processing a list of images, feeding it
into the detection model and getting from it scores,
bounding boxes and predicted classes present
in the images
"""
if type(images) is not list:
images = [images]
results = list()
for image in images:
# the array based representation of the image will
# be used later in order to prepare the resulting
# image with boxes and labels on it.
image_np = self.load_image_into_numpy_array(image)
# Expand dimensions since the model expects images
# to have shape: [1, None, None, 3]
image_np_expanded = np.expand_dims(image_np, axis=0)
image_tensor = \
self.DETECTION_GRAPH.get_tensor_by_name(
'image_tensor:0')
# Each box represents a part of the image where a
# particular object was detected.
boxes = self.DETECTION_GRAPH.get_tensor_by_name(
'detection_boxes:0')
# Each score represent how level of confidence
# for each of the objects. Score could be shown
# on the result image, together with the class label.
scores = self.DETECTION_GRAPH.get_tensor_by_name(
'detection_scores:0')
classes = self.DETECTION_GRAPH.get_tensor_by_name(
'detection_classes:0')
num_detections = \
self.DETECTION_GRAPH.get_tensor_by_name(
'num_detections:0')
# Actual detection happens here
(boxes, scores, classes, num_detections) = \
self.TF_SESSION.run(
[boxes, scores, classes, num_detections],
feed_dict={image_tensor: image_np_expanded})
if annotate_on_image:
new_image = self.detection_on_image(
image_np, boxes, scores, classes)
results.append((new_image, boxes,
scores, classes, num_detections))
else:
results.append((image_np, boxes,
scores, classes, num_detections))
return results
The function detection_on_image just processes the results from the detect function and returns a new image enriched by bounding boxes which will be represented on screen by the function visualize_image (You can adjust the latency parameter, which corresponds to the seconds the image will stay on screen before the script passes to process another image).
def detection_on_image(self, image_np, boxes, scores,
classes):
"""
Put detection boxes on the images over
the detected classes
"""
vis_util.visualize_boxes_and_labels_on_image_array(
image_np,
np.squeeze(boxes),
np.squeeze(classes).astype(np.int32),
np.squeeze(scores),
self.CATEGORY_INDEX,
use_normalized_coordinates=True,
line_thickness=8)
return image_np
The function visualize_image offers a few parameters that could be modified in order to suit your needs in this project. First of all, image_size provides the desired size of the image to be represented on screen. Larger or shorter images are therefore modified in order to partially resemble this prescribed size. The latency parameter, instead, will define the time in seconds that each image will be represented on the screen, thus locking the object detection procedure, before moving to the next one. Finally, the bluish_correction is just a correction to be applied when images are offered in the BGR format (in this format the color channels are arranged in the order: blue-green-red and it is the standard for the OpenCV library: https://stackoverflow.com/questions/14556545/why-opencv-using-bgr-colour-space-instead-of-rgb) , instead of the RGB (red-green-blue), which is the image format the model is expecting:
def visualize_image(self, image_np, image_size=(400, 300),
latency=3, bluish_correction=True):
height, width, depth = image_np.shape
reshaper = height / float(image_size[0])
width = int(width / reshaper)
height = int(height / reshaper)
id_img = 'preview_' + str(np.sum(image_np))
cv2.startWindowThread()
cv2.namedWindow(id_img, cv2.WINDOW_NORMAL)
cv2.resizeWindow(id_img, width, height)
if bluish_correction:
RGB_img = cv2.cvtColor(image_np, cv2.COLOR_BGR2RGB)
cv2.imshow(id_img, RGB_img)
else:
cv2.imshow(id_img, image_np)
cv2.waitKey(latency*1000)
Annotations are prepared and written to disk by the serialize_annotations function, which will create single JSON files containing, for each image, the data regarding the detected classes, the vertices of the bounding boxes, and the detection confidence. For instance, this is the result from a detection on a dog's photo:
"{"scores": [0.9092628359794617], "classes": ["dog"], "boxes": [[0.025611668825149536, 0.22220897674560547, 0.9930437803268433, 0.7734537720680237]]}"
The JSON points out the detected class, a single dog, the level of confidence (about 0.91 confidence), and the vertices of the bounding box, and expresses as percentages the height and width of the image (they are therefore relative, not absolute pixel points):
def serialize_annotations(self, boxes, scores, classes, filename='data.json'):
"""
Saving annotations to disk, to a JSON file
"""
threshold = self.THRESHOLD
valid = [position for position, score in enumerate(
scores[0]) if score > threshold]
if len(valid) > 0:
valid_scores = scores[0][valid].tolist()
valid_boxes = boxes[0][valid].tolist()
valid_class = [self.LABELS[int(
a_class)] for a_class in classes[0][valid]]
with open(filename, 'w') as outfile:
json_data = {'classes': valid_class,
'boxes':valid_boxes, 'scores': valid_scores})
json.dump(json_data, outfile)
The function get_time conveniently transforms the actual time into a string that can be used in a filename:
def get_time(self):
"""
Returning a string reporting the actual date and time
"""
return strftime("%Y-%m-%d_%Hh%Mm%Ss", gmtime())
Finally, we prepare three detection pipelines, for images, videos, and webcam. The pipeline for images loads each image into a list. The pipeline for videos lets the VideoFileClip module from moviepy do all the heavy lifting after simply passing the detect function appropriately wrapped in the annotate_photogram function. Finally, the pipeline for webcam capture relies on a simple capture_webcam function that, based on OpenCV's VideoCapture, records a number of snapshots from the webcam returning just the last (the operation takes into account the time necessary for the webcam before adjusting to the light levels of the environment):
def annotate_photogram(self, photogram):
"""
Annotating a video's photogram with bounding boxes
over detected classes
"""
new_photogram, boxes, scores, classes, num_detections =
self.detect(photogram)[0]
return new_photogram
The capture_webcam function will acquire an image from your webcam using the cv2.VideoCapture functionality (http://docs.opencv.org/3.0-beta/modules/videoio/doc/reading_and_writing_video.html) . As webcams have first to adjusts to the light conditions present in the environment where the picture is taken, the procedure discards a number of initial shots, before taking the shot that will be used in the object detection procedure. In this way, the webcam has all the time to adjust its light settings, :
def capture_webcam(self):
"""
Capturing an image from the integrated webcam
"""
def get_image(device):
"""
Internal function to capture a single image
from the camera and return it in PIL format
"""
retval, im = device.read()
return im
# Setting the integrated webcam
camera_port = 0
# Number of frames to discard as the camera
# adjusts to the surrounding lights
ramp_frames = 30
# Initializing the webcam by cv2.VideoCapture
camera = cv2.VideoCapture(camera_port)
# Ramping the camera - all these frames will be
# discarded as the camera adjust to the right light levels
print("Setting the webcam")
for i in range(ramp_frames):
_ = get_image(camera)
# Taking the snapshot
print("Now taking a snapshot ... ", end='')
camera_capture = get_image(camera)
print('Done')
# releasing the camera and making it reusable
del (camera)
return camera_capture
The file_pipeline comprises all the steps necessary to load images from storage and visualize/annotate them:
- Loading images from disk.
- Applying object detection on the loaded images.
- Writing the annotations for each image in a JSON file.
- If required by the Boolean parameter visualize, represent the images with its bounding boxes on the computer's screen:
def file_pipeline(self, images, visualize=True):
"""
A pipeline for processing and annotating lists of
images to load from disk
"""
if type(images) is not list:
images = [images]
for filename in images:
single_image = self.load_image_from_disk(filename)
for new_image, boxes, scores, classes, num_detections in
self.detect(single_image):
self.serialize_annotations(boxes, scores, classes,
filename=filename + ".json")
if visualize:
self.visualize_image(new_image)
The video_pipeline simply arranges all the steps necessary to annotate a video with bounding boxes and, after completing the operation, saves it to disk:
def video_pipeline(self, video, audio=False):
"""
A pipeline to process a video on disk and annotating it
by bounding box. The output is a new annotated video.
"""
clip = VideoFileClip(video)
new_video = video.split('/')
new_video[-1] = "annotated_" + new_video[-1]
new_video = '/'.join(new_video)
print("Saving annotated video to", new_video)
video_annotation = clip.fl_image(self.annotate_photogram)
video_annotation.write_videofile(new_video, audio=audio)
The webcam_pipeline is the function that arranges all the steps when you want to annotate an image acquired from your webcam:
- Captures an image from the webcam.
- Saves the captured image to disk (using cv2.imwrite which has the advantage of writing different image formats based on the target filename, see at: http://docs.opencv.org/3.0-beta/modules/imgcodecs/doc/reading_and_writing_images.html
- Applies object detection on the image.
- Saves the annotation JSON file.
- Represents visually the image with bounding boxes:
def webcam_pipeline(self):
"""
A pipeline to process an image acquired by the internal webcam
and annotate it, saving a JSON file to disk
"""
webcam_image = self.capture_webcam()
filename = "webcam_" + self.get_time()
saving_path = os.path.join(self.CURRENT_PATH, filename + ".jpg")
cv2.imwrite(saving_path, webcam_image)
new_image, boxes, scores, classes, num_detections =
self.detect(webcam_image)[0]
json_obj = {'classes': classes, 'boxes':boxes, 'scores':scores}
self.serialize_annotations(boxes, scores, classes, filename=filename+".json")
self.visualize_image(new_image, bluish_correction=False)