CV

基于yoloV3的车流量检测实现

yoloV3、Kalman、Hungarian、SORT

Posted by 新宇 on September 2, 2020

一、yoloV3复习

1. 多尺度检测方法

在YOLOv3中采用FPN结构来提高对应多尺度目标检测的精度,当前的feature map利用“未来”层的信息,将低阶特征与高阶特征进行融合,提升检测精度。

2. 网络模型结构

  • 以darknet-53为基础,借鉴resnet的思想,在网络中加入了残差模块,利于解决深层次网络的梯度问题
  • 整个v3结构里面,没有池化层和全连接层,只有卷积层
  • 网络的下采样是通过设置卷积的stride为2来达到的

3. 先验框

采用K-means聚类得到先验框的尺寸,为每种尺度设定3种先验框,总共聚类出9种尺寸的先验框。

4. 多标签的目标分类

预测对象类别时不使用softmax,而是使用logistic的输出进行预测

5. 模型的输入输出

对于416×416×3的输入图像,在每个尺度的特征图的每个网格设置3个先验框,总共有 13×13×3 + 26×26×3 + 52×52×3 = 10647 个预测。每一个预测是一个(4+1+80)=85维向量,这个85维向量包含边框坐标(4个数值),边框置信度(1个数值),对象类别的概率(对于COCO数据集,有80种对象)。

二、数据集介绍

  • 自动驾驶数据集 KITTI
    • http://www.cvlibs.net/datasets/kitti/eval_object.php?obj_benchmark=2d
  • 训练集:7481张
  • 测试集:7518张
  • 类别: 8个类别
    • Car(小轿车),Van(面包车),Truck(卡车),Tram(电车),Pedestrain(行人),Person(sit-ting)(行人),Cyclist(骑行人),Misc(杂项)
  • 目录描述:
    • Annotations文件夹存放标签文件xxxx.xml
    • JPEGImages文件夹存放了所有的图片
    • labels文件夹存放了darknet框架的标签格式文件xxxx.txt
    • 基于ImageNet预训练模型
  • 训练时长: 3h
  • 硬件配置:Tesla P100 PCIe 16GB
  • 采用Adam优化算法
  • epoch: 10轮

三、代码实现

1. kalman实现

from __future__ import print_function
# 对for循环有姮好的效果

from numba import jit
import numpy as np
# 用于线性分配,匈牙利匹配的实现

# from sklearn.utils.linear_assignment_ import linear_assignment

from scipy.optimize import linear_sum_assignment
# 使用卡尔曼滤波器

from filterpy.kalman import KalmanFilter


@jit
def iou(bb_test, bb_gt):
    """
    在两个box间计算IOU
    :param bb_test: box1 = [x1y1x2y2]
    :param bb_gt: box2 = [x1y1x2y2]
    :return: 交并比IOU
    """

    xx1 = np.maximum(bb_test[0], bb_gt[0])
    yy1 = np.maximum(bb_test[1], bb_gt[1])
    xx2 = np.minimum(bb_test[2], bb_gt[2])
    yy2 = np.minimum(bb_test[3], bb_gt[3])
    w = np.maximum(0., xx2 - xx1)
    h = np.maximum(0., yy2 - yy1)
    wh = w * h
    o = wh / ((bb_test[2] - bb_test[0]) * (bb_test[3] - bb_test[1]) + (bb_gt[2] - bb_gt[0]) * (
            bb_gt[3] - bb_gt[1]) - wh)
    return o


def convert_bbox_to_z(bbox):
    """
    将[x1,y1,x2,y2]形式的检测框转为滤波器的状态表示形式[x,y,s,r]。其中x,y是框的中心,s是比例/区域,r是宽高比
    :param bbox: [x1,y1,x2,y2] 分别是左上角坐标和右下角坐标
    :return: [ x, y, s, r ] 4行1列,其中x,y是box中心位置的坐标,s是面积,r是纵横比w/h
    """

    w = bbox[2] - bbox[0]
    h = bbox[3] - bbox[1]
    x = bbox[0] + w / 2.
    y = bbox[1] + h / 2.
    s = w * h
    r = w / float(h)
    return np.array([x, y, s, r]).reshape((4, 1))


def convert_x_to_bbox(x, score=None):
    """
    将[cx,cy,s,r]的目标框表示转为[x_min,y_min,x_max,y_max]的形式
    :param x:[ x, y, s, r ],其中x,y是box中心位置的坐标,s是面积,r
    :param score: 置信度
    :return:[x1,y1,x2,y2],左上角坐标和右下角坐标
    """

    w = np.sqrt(x[2] * x[3])
    h = x[2] / w
    if score is None:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2.]).reshape((1, 4))
    else:
        return np.array([x[0] - w / 2., x[1] - h / 2., x[0] + w / 2., x[1] + h / 2., score]).reshape((1, 5))


"""
# 表示观测目标框bbox所对应的单个跟踪对像的内部状态
"""

class KalmanBoxTracker(object):
    count = 0

    def __init__(self, bbox):
        """
        初始化边界框和跟踪器
        :param bbox:
        """

        # 定义等速模型

        # 内部使用KalmanFilter,7个状态变量和4个观测输入

        self.kf = KalmanFilter(dim_x=7, dim_z=4)
        # F是状态变换模型

        self.kf.F = np.array(
            [[1, 0, 0, 0, 1, 0, 0], [0, 1, 0, 0, 0, 1, 0], [0, 0, 1, 0, 0, 0, 1], [0, 0, 0, 1, 0, 0, 0],
             [0, 0, 0, 0, 1, 0, 0], [0, 0, 0, 0, 0, 1, 0], [0, 0, 0, 0, 0, 0, 1]])
        # H是观测函数

        self.kf.H = np.array(
            [[1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 0, 0, 0, 0], [0, 0, 1, 0, 0, 0, 0], [0, 0, 0, 1, 0, 0, 0]])
        # R是观测函数

        self.kf.R[2:, 2:] *= 10.
        # P是协方差矩阵

        self.kf.P[4:, 4:] *= 1000.  # give high uncertainty to the unobservable initial velocities
        
        self.kf.P *= 10.
        # Q是过程噪声矩阵

        self.kf.Q[-1, -1] *= 0.01
        self.kf.Q[4:, 4:] *= 0.01
        # 内部状态估计

        self.kf.x[:4] = convert_bbox_to_z(bbox)
        self.time_since_update = 0
        self.id = KalmanBoxTracker.count
        KalmanBoxTracker.count += 1
        self.history = []
        self.hits = 0
        self.hit_streak = 0
        self.age = 0

    def update(self, bbox):
        """
        使用观察到的目标框更新状态向量。filterpy.kalman.KalmanFilter.update 会根据观测修改内部状态估计self.kf.x。
        重置self.time_since_update,清空self.history。
        :param bbox:目标框
        :return:
        hit_streak:判断当前是否做了更新,大于等于1的说明做了更新,只要连续帧中没有做连续更新,hit_streak就会清零
        """

        self.time_since_update = 0
        self.history = []
        self.hits += 1
        self.hit_streak += 1
        self.kf.update(convert_bbox_to_z(bbox))

    def predict(self):
        """
        推进状态向量并返回预测的边界框估计。
        将预测结果追加到self.history。由于 get_state 直接访问 self.kf.x,所以self.history没有用到
        :return:

        """

        if (self.kf.x[6] + self.kf.x[2]) <= 0:
            self.kf.x[6] *= 0.0
        self.kf.predict()
        self.age += 1
        if self.time_since_update > 0:
            self.hit_streak = 0
        self.time_since_update += 1
        self.history.append(convert_x_to_bbox(self.kf.x))
        return self.history[-1]

    def get_state(self):
        """
        返回当前边界框估计值
        :return:
        """

        return convert_x_to_bbox(self.kf.x)


def associate_detections_to_trackers(detections, trackers, iou_threshold=0.3):
    """
    将检测框bbox与卡尔曼滤波器的跟踪框进行关联匹配
    :param detections:检测框
    :param trackers:跟踪框,即跟踪目标
    :param iou_threshold:IOU阈值
    :return:跟踪成功目标的矩阵:matchs
            新增目标的矩阵:unmatched_detections
            跟踪失败即离开画面的目标矩阵:unmatched_trackers
    """

    # 跟踪目标数量为0,直接构造结果

    #第一帧没有跟踪框,只有检测框,所以返回3个值:(1)匹配到的[d,t](空的);(2)没有匹配到的检测框;(3)没有匹配到的跟踪框(空的)

    if (len(trackers) == 0) or (len(detections) == 0):
        return np.empty((0, 2), dtype=int), np.arange(len(detections)), np.empty((0, 5), dtype=int)

    # iou 不支持数组计算。逐个计算两两间的交并比,调用 linear_assignment 进行匹配

    iou_matrix = np.zeros((len(detections), len(trackers)), dtype=np.float32)
    # 遍历目标检测的bbox集合,每个检测框的标识为d

    for d, det in enumerate(detections):
        # 遍历跟踪框(卡尔曼滤波器预测)bbox集合,每个跟踪框标识为t

        for t, trk in enumerate(trackers):
            iou_matrix[d, t] = iou(det, trk)
    # 通过匈牙利算法将跟踪框和检测框以[[d,t]...]的二维矩阵的形式存储在match_indices中

    # 为什么是负号:linear_assignment的输入是成本矩阵,IOU越大对应的分配代价应越小

    # matched_indices = linear_assignment(-iou_matrix)

    result = linear_sum_assignment(-iou_matrix)
    matched_indices = np.array(list(zip(*result)))

    # 记录未匹配的检测框及跟踪框

    # 未匹配的检测框放入unmatched_detections中,表示有新的目标进入画面,要新增跟踪器跟踪目标

    unmatched_detections = []
    for d, det in enumerate(detections):
        if d not in matched_indices[:, 0]:
            unmatched_detections.append(d)
    # 未匹配的跟踪框放入unmatched_trackers中,表示目标离开之前的画面,应删除对应的跟踪器

    unmatched_trackers = []
    for t, trk in enumerate(trackers):
        if t not in matched_indices[:, 1]:
            unmatched_trackers.append(t)
    # 将匹配成功的跟踪框放入matches中

    matches = []
    for m in matched_indices:
        # 过滤掉IOU低的匹配,将其放入到unmatched_detections和unmatched_trackers

        if iou_matrix[m[0], m[1]] < iou_threshold:
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        # 满足条件的以[[d,t]...]的形式放入matches中

        else:
            matches.append(m.reshape(1, 2))
    # 初始化matches,以np.array的形式返回

    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matches = np.concatenate(matches, axis=0)

    return matches, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
    """
    Sort 是一个多目标跟踪器,管理多个 KalmanBoxTracker 对象
    """

    def __init__(self, max_age=1, min_hits=3):
        """
        初始化:设置SORT算法的关键参数
        :param max_age: 最大检测数:目标未被检测到的帧数,超过之后会被删除
        :param min_hits:最小更新的次数,就是放在self.trackers跟踪器列表中的框与检测框匹配上,
        然后调用卡尔曼滤波器类中的update函数的最小次数,min_hits不设置为0是因为第一次检测到的目标不用跟踪,
        只需要加入到跟踪器列表中,不会显示,这个值不能设大,一般就是1
        """

        self.max_age = max_age
        self.min_hits = min_hits
        self.trackers = []   # ?
        self.frame_count = 0  # ?

    def update(self, dets):
        """
        该方法实现了SORT算法,输入是当前帧中所有物体的检测框的集合,包括目标的score,
        输出是当前帧目标的跟踪框集合,包括目标的跟踪的id
        要求是即使检测框为空,也必须对每一帧调用此方法,返回一个类似的输出数组,最后一列是目标对像的id
        注意:返回的目标对象数量可能与检测框的数量不同
        :param dets:以[[x1,y1,x2,y2,score],[x1,y1,x2,y2,score],...]形式输入的numpy.array
        :return:
        """

        self.frame_count += 1
        # 在当前帧逐个预测轨迹位置,记录状态异常的跟踪器索引

        # 根据当前所有的卡尔曼跟踪器个数(即上一帧中跟踪的目标个数)创建二维数组:行号为卡尔曼滤波器的标识索引,列向量为跟踪框的位置和ID

        trks = np.zeros((len(self.trackers), 5))  # 存储跟踪器的预测

        to_del = []   # 存储要删除的目标框

        ret = []    # 存储要返回的追踪目标框

        # 循环遍历卡尔曼跟踪器列表

        for t, trk in enumerate(trks):
            # 使用卡尔曼跟踪器t产生对应目标的跟踪框

            pos = self.trackers[t].predict()[0]
            # 遍历完成后,trk中存储了上一帧中跟踪的目标的预测跟踪框

            trk[:] = [pos[0], pos[1], pos[2], pos[3], 0]
            # 如果跟踪框中包含空值则将该跟踪框添加到要删除的列表中

            if np.any(np.isnan(pos)):
                to_del.append(t)
        # numpy.ma.masked_invalid 屏蔽出现无效值的数组(NaN 或 inf)

        # numpy.ma.compress_rows 压缩包含掩码值的2-D 数组的整行,将包含掩码值的整行去除

        # trks中存储了上一帧中跟踪的目标并且在当前帧中的预测跟踪框

        trks = np.ma.compress_rows(np.ma.masked_invalid(trks))
        # 逆向删除异常的跟踪器,防止破坏索引

        for t in reversed(to_del):
            self.trackers.pop(t)
        # 将目标检测框与卡尔曼滤波器预测的跟踪框关联获取跟踪成功的目标,新增的目标,离开画面的目标

        matched, unmatched_dets, unmatched_trks = associate_detections_to_trackers(dets, trks)

        # 将跟踪成功的目标框更新到对应的卡尔曼滤波器

        for t, trk in enumerate(self.trackers):
            if t not in unmatched_trks:
                d = matched[np.where(matched[:, 1] == t)[0], 0]
                # 使用观测的边界框更新状态向量

                trk.update(dets[d, :][0])

        # 为新增的目标创建新的卡尔曼滤波器对象进行跟踪

        for i in unmatched_dets:
            trk = KalmanBoxTracker(dets[i, :])
            self.trackers.append(trk)

        # 自后向前遍历,仅返回在当前帧出现且命中周期大于self.min_hits(除非跟踪刚开始)的跟踪结果;如果未命中时间大于self.max_age则删除跟踪器。

        # hit_streak忽略目标初始的若干帧

        i = len(self.trackers)
        for trk in reversed(self.trackers):
            # 返回当前边界框的估计值

            d = trk.get_state()[0]
            # 跟踪成功目标的box与id放入ret列表中

            if (trk.time_since_update < 1) and (trk.hit_streak >= self.min_hits or self.frame_count <= self.min_hits):
                ret.append(np.concatenate((d, [trk.id + 1])).reshape(1, -1))  # +1 as MOT benchmark requires positive

            i -= 1
            # 跟踪失败或离开画面的目标从卡尔曼跟踪器中删除

            if trk.time_since_update > self.max_age:
                self.trackers.pop(i)
        # 返回当前画面中所有目标的box与id,以二维矩阵形式返回

        if len(ret) > 0:
            return np.concatenate(ret)
        return np.empty((0, 5))

2. 模型加载与碰撞检测

from kalman import *
import imutils
import time
import cv2
import numpy as np
import matplotlib.pyplot as plt

line = [(0, 150), (2560, 150)]
# 车辆总数

counter = 0
# 正向车道的车辆数据

counter_up = 0
# 逆向车道的车辆数据

counter_down = 0

# 创建跟踪器对象

tracker = Sort()
memory = {}


# 线与线的碰撞检测:叉乘的方法判断两条线是否相交

# 计算叉乘符号≤

def ccw(A, B, C):
    return (C[1] - A[1]) * (B[0] - A[0]) > (B[1] - A[1]) * (C[0] - A[0])


# 检测AB和CD两条直线是否相交

def intersect(A, B, C, D):
    return ccw(A, C, D) != ccw(B, C, D) and ccw(A, B, C) != ccw(A, B, D)


# 利用yoloV3模型进行目标检测

# 加载模型相关信息

# 加载可以检测的目标的类型

labelPath = "./yolo-coco/coco.names"
LABELS = open(labelPath).read().strip().split("\n")
# 生成多种不同的颜色

np.random.seed(42)
COLORS = np.random.randint(0, 255, size=(200, 3), dtype='uint8')
# 加载预训练的模型:权重 配置信息,进行恢复

weightsPath = "./yolo-coco/yolov3.weights"
configPath = "./yolo-coco/yolov3.cfg"
net = cv2.dnn.readNetFromDarknet(configPath, weightsPath)
# 获取yolo中每一层的名称

ln = net.getLayerNames()
# 获取输出层的名称: [yolo-82,yolo-94,yolo-106]

ln = [ln[i[0] - 1] for i in net.getUnconnectedOutLayers()]

# 读取图像

# frame = cv2.imread('./images/car2.jpg')

# (W,H) = (None,None)

# (H,W) = frame.shape[:2]

# 视频

vs = cv2.VideoCapture('./input/test_1.mp4')
(W, H) = (None, None)
writer = None
try:
    prop = cv2.cv.CV_CAP_PROP_Frame_COUNT if imutils.is_cv2() else cv2.CAP_PROP_FRAME_COUNT
    total = int(vs.get(prop))
    print("INFO:{} total Frame in video".format(total))
except:
    print("[INFO] could not determine in video")

# 遍历每一帧图像

while True:
    (grabed, frame) = vs.read()
    if not grabed:
        break
    if W is None or H is None:
        (H,W) = frame.shape[:2]
    # 将图像转换为blob,进行前向传播

    blob = cv2.dnn.blobFromImage(frame, 1 / 255.0, (416, 416), swapRB=True, crop=False)
    # 将blob送入网络

    net.setInput(blob)
    start = time.time()
    # 前向传播,进行预测,返回目标框边界和相应的概率

    layerOutputs = net.forward(ln)
    end = time.time()

    # 存放目标的检测框

    boxes = []
    # 置信度

    confidences = []
    # 目标类别

    classIDs = []

    # 遍历每个输出

    for output in layerOutputs:
        # 遍历检测结果

        for detection in output:
            # detction:1*85 [5:]表示类别,[0:4]bbox的位置信息 【5】置信度

            scores = detection[5:]
            classID = np.argmax(scores)
            confidence = scores[classID]

            if confidence > 0.4:
                # 将检测结果与原图片进行适配

                box = detection[0:4] * np.array([W, H, W, H])
                (centerX, centerY, width, height) = box.astype("int")
                # 左上角坐标

                x = int(centerX - width / 2)
                y = int(centerY - height / 2)
                # 更新目标框,置信度,类别

                boxes.append([x, y, int(width), int(height)])
                confidences.append(float(confidence))
                classIDs.append(classID)
    # 非极大值抑制

    idxs = cv2.dnn.NMSBoxes(boxes, confidences, 0.5,0.3)
    # 检测框:左上角和右下角

    dets = []
    if len(idxs) > 0:
        for i in idxs.flatten():
            if LABELS[classIDs[i]] == "car":
                (x, y) = (boxes[i][0], boxes[i][1])
                (w, h) = (boxes[i][2], boxes[i][3])
                # cv2.rectangle(frame,(x,y),(x+w,y+h),(0,255,0),2)

                dets.append([x, y, x + w, y + h, confidences[i]])
    # 类型设置

    np.set_printoptions(formatter={'float': lambda x: "{0:0.3f}".format(x)})
    dets = np.asarray(dets)

    # # 显示

    # plt.imshow(frame[:,:,::-1])

    # plt.show()

    # SORT目标跟踪

    if np.size(dets) == 0:
        continue
    else:
        tracks = tracker.update(dets)
    # 跟踪框

    boxes = []
    # 置信度

    indexIDs = []
    # 前一帧跟踪结果

    previous = memory.copy()
    memory = {}
    for track in tracks:
        boxes.append([track[0], track[1], track[2], track[3]])
        indexIDs.append(int(track[4]))
        memory[indexIDs[-1]] = boxes[-1]

    # 碰撞检测

    if len(boxes) > 0:
        i = int(0)
        # 遍历跟踪框

        for box in boxes:
            (x, y) = (int(box[0]), int(box[1]))
            (w, h) = (int(box[2]), int(box[3]))
            color = [int(c) for c in COLORS[indexIDs[i] % len(COLORS)]]
            cv2.rectangle(frame, (x, y), (w, h), color, 2)

            # 根据在上一帧和当前帧的检测结果,利用虚拟线圈完成车辆计数

            if indexIDs[i] in previous:
                previous_box = previous[indexIDs[i]]
                (x2, y2) = (int(previous_box[0]), int(previous_box[1]))
                (w2, h2) = (int(previous_box[2]), int(previous_box[3]))
                p1 = (int(x2 + (w2 - x2) / 2), int(y2 + (h2 - y2) / 2))
                p0 = (int(x + (w - x) / 2), int(y + (h - y) / 2))

                # 利用p0,p1与line进行碰撞检测

                if intersect(p0, p1, line[0], line[1]):
                    counter += 1
                    # 判断行进方向

                    if y2 > y:
                        counter_down += 1
                    else:
                        counter_up += 1
            i += 1

    # 将车辆计数的相关结果放在视频上

    cv2.line(frame, line[0], line[1], (0, 255, 0), 3)
    cv2.putText(frame, str(counter), (30, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (255, 0, 0), 3)
    cv2.putText(frame, str(counter_up), (130, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (0, 255, 0), 3)
    cv2.putText(frame, str(counter_down), (230, 80), cv2.FONT_HERSHEY_DUPLEX, 3.0, (0, 0, 255), 3)

    # 将检测结果保存在视频

    if writer is None:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        writer = cv2.VideoWriter("./output/output.mp4", fourcc, 30, (frame.shape[1], frame.shape[0]), True)
    writer.write(frame)
    cv2.imshow("", frame)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# "释放资源"

writer.release()
vs.release()
cv2.destroyAllWindows()