将海康IPC摄像头PS流保存到文件并转换为MP4视频

Published: 2020-03-27

Tags: gb28181

本文总阅读量

概览

在上文中分析了海康IPC摄像头发过来的RTP承载的PS流的内容部分,起始我最开始想的是把RTP流直接喂给FFmpeg,在网上找资料没找到,试了几次都没成功,想来码流需要处理一下才能传给FFmpeg,于是先退而求其次,把它保存起来。

先行知识

1,了解 SIP 协议栈。

2,了解 RTP 协议。

3,了解 PS 封装格式。

4,了解 H.264 编码基础。

之前的两篇文章基本涵盖了先行知识点。

1,《基于国标GB/T28181标准从海康摄像头获取PS流》

2,《海康摄像头PS流格式解析(RTP/PS/H264)》

RTP 结构

RTP 结构之前没有仔细看,想要解析PS流,了解其结构是必须的 《RTP/RTCP协议解析》 这篇文章介绍的比较详细清晰。

上图是 RTP 的头部信息,也就是 Wireshark 收到数据的 RTP 部分,橙色是头部信息,绿色为Payload,此处为PS封装的数据。

整个大块的数据就是程序监听UDP端口收到的数据,了解 RTP 结构后,解析出 Padding 和 Extension 标志位,才能最终计算出 RTP 头部占用的字节数,进而知道从哪里开始是 Payload 数据体部分。

封装回顾

在分析PS流结构的时候,后续我又查看了几个包,整理了一个笔记。

描述一下(包序号,字节数每次运行都会不通):

1,第一个 RTP 包保存有 PS 头,系统头,PSM,多个PES,其中有 SPS,PPS,SEI,最后一个 PES 中保存着 H264 封装的视频 I 帧。

2,因为 I 帧很大,所以需要拆分到多个 RTP 包来发送,根据长度计算,直到第 Seq: 46 号 RTP 包才到了刚才 I 帧 PES 包的结尾,紧接着又是一个视频帧,50000 多个字节,直到 Seq: 83 号 RTP 包结尾处才结束。

3,有个补充的地方,见下图,PES 包结束后还有一个 PES 包,是以 00 00 01 bd 起始的,网上说是海康私有数据,似乎暂时没什么用可以忽略。

4,接下来 Seq: 84 号 RTP 包开始了一个 00 00 01 c0 的音频包,连续几个 336 字节。

5,来到第 Seq: 87 号 RTP 包,又看到了 00 00 01 ba 新的 PS 封装包开始了。

6,由此可见,H264 编码的原始流(ES)经过封装为 PES,多个 PES 又组合在一起,封装成适当大小的 PS 包,PS 包经由网络发送时,又被拆分为固定大小的数据块由 RTP 来承载发送,接收端解析操作正好相反。

7,所以看一看到,要是确定一个包里有多少帧,还是要解析到 Payload 的 H264 编码部分才能确定,不然从 RTP 和 PS 层面是没法确定的,I 帧很大需要占用多个 RTP 包来发送,PS 里也可能有多个 P 帧,B帧。

题外话:H264 文件

我对于 H264 编码目前只是简单了解其结构,知道 00 00 00 01 作为分隔,我查看过好几个 H264 文件,有趣的是它们似乎都不太一样。

比如这个:

67 / 27 代表 SPS,68 / 28 代表 PPS,接下来 65 / 25 是一个 I 帧,61 / 21 是 P 帧,后续就是 25,21,21,21,21,25,21,21,21,21,21,21,25,21,21,21,21,25,21,21,21...

循环下去了,所以从 PS 流中把 H264 数据取出,直接写到文件里,看起来就可以了,这种格式的 H264 文件是可以被 FFmpeg 转换为 MP4 视频的。

在动手写之前,我想到最开始了解 SIP 时候看到 Github 上的一个 Python 代码实现,当时看着很头大,有了先行知识,就可以重新来看了。

代码实现

这位老哥在 Github 分享的地址:https://github.com/10961020/GB28181

他实现了发送SIP命令,接收视频流,和控制球机的脚本,可以进行参考。

因为我之前的测试,自己写了模拟的 SIP Server,SIP Client,摄像头直接推流到 Client 的方式测试的,所以我现在能收到 RTP 流,比较关心他的 h264.py 获取流并保存的部分。

十分感谢这位老哥的分享,节省了我不少的时间,感谢,稍微有些瑕疵的地方就是代码似乎不那么 Pythonic,注释也少,我在他代码的基础上改了改,加了注释,代码如下:

#!/bin/env python3
# -*- coding: utf-8 -*-

"""
    视频转接核心处理模块包
"""

import os
import shutil
import time
import socket
import bitstring
import random
from pathlib import Path

# UDP接收长度
MAX_RTP_PKT_LEN = 1500
# 视频文件保存的目录
SAVE_DIR = "video"
# 每100个PS包保存为一个文件
PER_PACK_PS_LEN = 100
# 保存全局的序列号
sn = {}
# 用于PS包计数
ji_shu = {}
# 保存RTP包数据
rtp_dict = {}  # {序号,数据内容}
# 保存累加的PS包内容
rtp_shipin = {}
# 保存目前文件起始时间
time_ = {}


# 获取RTP扩展头长度
def parse_ext_hdr(bt, lc):

    '''
    # ++++++++++++++++++++++++++++++++++
    # |    profile   |    length       |
    # ++++++++++++++++++++++++++++++++++
    # |       header extension         |
    # |             ...                |
    # ++++++++++++++++++++++++++++++++++

    # 扩展头有一个固定的4字节的头部(profile|length)
    # 后两个字节保存的是扩展头项32比特的个数,不包括扩展头
    # length值为零是有效值,函数返回扩展头字节数
    '''
    # 跳过两个字节的profile解析出length值
    bc = (lc + 2) * 8
    length = bt[bc: bc + 16].uint

    # 扩展头固定四字节长度加每个扩展项四字节
    return 4 + 4 * length


# 从Payload中取H264保存到文件
def parse_frame(pay, name_id):

    global ji_shu
    global time_
    global rtp_shipin

    vhome = Path(SAVE_DIR, name_id)
    rtp_shipin[name_id] += pay

    # 0xba 是PS封装起始标志
    if pay[:4] == b'\x00\x00\x01\xba':
        ji_shu[name_id] += 1
        # 每解析100个PS包切换切换一个临时文件
        if ji_shu[name_id] % 100 == 0:
            tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
            datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
            shutil.move(tmpfile, datfile)
            time_[name_id] = time.strftime('%Y%m%d%H%M%S')
            print("Generate: " + datfile.as_posix())

        # 以PS包为单位写入文件
        tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
        with open(tmpfile, 'a', encoding='latin1') as f:
            f.write(rtp_shipin[name_id].decode('latin1'))
            rtp_shipin[name_id] = b''


# 提取RTP包中payload
def recv_pkt(data, name_id):
    global sn
    global rtp_dict

    # RTP 数据总长度
    data_len = len(data)

    # 将数据转换为二进制
    bt = bitstring.BitArray(bytes=data)

    # 解析数据
    # Padding, Extension 标志位
    p, x = bt[2], bt[3]

    # CC: 4比特 表示固定头部后跟着的CSRC数目
    cc = bt[4:8].uint

    # SN: 16比特 序列号(Sequence number)
    # 每发送一个RTP数据包,序列号增加1 接收端可据此监测丢包和重建包序列
    sn_now = bt[16:32].uint

    # RTP 头部长度字节数(v|p|x|cc|m|pt|sn|ts|ssrc|csrc)
    lc = 12 + 4 * cc if cc else 12

    # 如果该位置为1,则该RTP包的尾部就包含附加的填充内容
    # 填充内容的最后一个字节存储着填充内容的长度
    if p:
        data_len -= bt[-8:].uint

    # 如果该位置为1,则RTP固定头部后面就跟有一个扩展头部
    if x:
        lc += parse_ext_hdr(bt, lc)

    # Payload
    payload = data[lc:data_len]

    # PT, Payload Type
    # https://en.wikipedia.org/wiki/RTP_payload_formats
    # 暂不清楚原作者使用环境,未遇到PT为0x28的情况
    if sn[name_id] == -1 and bt[9:16].uint == 40: # 0x28
        sn[name_id] = bt[16:32].uint + 1
        while rtp_dict[name_id].get(sn[name_id]):
            parse_frame(rtp_dict[name_id].get(sn[name_id]), name_id)
            rtp_dict[name_id].pop(sn[name_id])
            sn[name_id] += 1

    elif sn[name_id] == sn_now:
        # RTP包顺序正常,与程序记录的一致
        parse_frame(payload, name_id)
        sn[name_id] += 1
        while rtp_dict[name_id].get(sn[name_id]):
            parse_frame(rtp_dict[name_id].get(sn[name_id]), name_id)
            rtp_dict[name_id].pop(sn[name_id])
            sn[name_id] += 1
    else:
        # sn不同步时重排序防止网络抖动造成的包顺序错误
        rtp_dict[name_id][sn_now] = payload
        if len(rtp_dict[name_id]) == 10:
            lin_shi = sorted(rtp_dict[name_id].keys())
            for i in lin_shi:
                parse_frame(rtp_dict[name_id][i], name_id)
                rtp_dict[name_id].pop(i)
                sn[name_id] = i+1


def main(name_id, port):

    global sn
    global ji_shu
    global time_
    global rtp_dict
    global rtp_shipin

    time_[name_id] = time.strftime('%Y%m%d%H%M%S')

    # 初始化序列号为-1
    sn[name_id] = -1
    ji_shu[name_id] = 0
    # 用于保存payload数据
    rtp_dict[name_id] = {}
    rtp_shipin[name_id] = b''

    # 创建UDP监听(当前主机IP,Port)
    address = ('192.168.104.183', port)
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind(address)
    s.settimeout(20)

    # 创建保存本次视频流的目录
    vhome = Path(SAVE_DIR, name_id)
    if not vhome.exists():
        os.makedirs(vhome)

    # 用于控制创建首个数据临时文件
    first_loop = True

    # 通过UDP接收RTP流
    while True:
        try:
            data, addr = s.recvfrom(MAX_RTP_PKT_LEN)
        except socket.timeout:
            try:
                tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
                datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
                shutil.move(tmpfile, datfile)
            except FileNotFoundError:
                print("FileNotFoundError")
                print(tmpfile.as_posix())
                pass
            break

        if len(data) == 2 and data.decode('latin1') == 'by':
            print("h264: Over {},端口: {} ,共 {} 包".format(name_id, port, ji_shu[name_id]))
            try:
                tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
                datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
                shutil.move(tmpfile, datfile)
            except FileNotFoundError:
                pass
            break

        # 创建第一个临时文件
        if first_loop:
            datenow = time.strftime('%Y%m%d%H%M%S')
            time_[name_id] = datenow
            tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
            Path(tmpfile).touch()
            print("Create first tmp file: " + tmpfile.as_posix())
            first_loop = False

        # 解析数据
        recv_pkt(data, name_id)

    s.close()


if __name__ == '__main__':
    # 本次记录的ID,监听端口号
    main('1', 6000)

运行后,通过 SIP Client 给摄像头发送 INVITE 命令成功后,接收的机器要加入到 IPC 的白名单。

就能把海康推送过来的 RTP 流保存到文件了

默认每100个PS包切换一个文件,我算了一下,半个小时的视频文件要1G存储,一天就是40多G,这还是一路...

阅读代码后,我发现并不是我之前查看 H264 文件时想的把 H264 部分取出来存到文件,而是直接把 PS 部分直接存到文件里了,这样生成的文件也是可以使用 FFmpeg 转换为 MP4 的。

H264 文件转 MP4

ffmpeg -i "1_20200326.dat" -c:v copy -f mp4 "out.mp4"

合并多个文件片段

cat *.dat > all.dat

转换成的 MP4 文件可以进行播放。

海康 IP Camera 通过 GB28181 规范获取的 PS 流保存到文件,并转换为 MP4 播放完成,先到这里。