概览
在上文中分析了海康IPC摄像头发过来的RTP承载的PS流的内容部分,起始我最开始想的是把RTP流直接喂给FFmpeg,在网上找资料没找到,试了几次都没成功,想来码流需要处理一下才能传给FFmpeg,于是先退而求其次,把它保存起来。
先行知识
1,了解 SIP 协议栈。
2,了解 RTP 协议。
3,了解 PS 封装格式。
4,了解 H.264 编码基础。
之前的两篇文章基本涵盖了先行知识点。
1,《基于国标GB/T28181标准从海康摄像头获取PS流》
RTP 结构
RTP 结构之前没有仔细看,想要解析PS流,了解其结构是必须的 《RTP/RTCP协议解析》 这篇文章介绍的比较详细清晰。
上图是 RTP 的头部信息,也就是 Wireshark 收到数据的 RTP 部分,橙色是头部信息,绿色为Payload,此处为PS封装的数据。
整个大块的数据就是程序监听UDP端口收到的数据,了解 RTP 结构后,解析出 Padding 和 Extension 标志位,才能最终计算出 RTP 头部占用的字节数,进而知道从哪里开始是 Payload 数据体部分。
封装回顾
在分析PS流结构的时候,后续我又查看了几个包,整理了一个笔记。
描述一下(包序号,字节数每次运行都会不通):
1,第一个 RTP 包保存有 PS 头,系统头,PSM,多个PES,其中有 SPS,PPS,SEI,最后一个 PES 中保存着 H264 封装的视频 I 帧。
2,因为 I 帧很大,所以需要拆分到多个 RTP 包来发送,根据长度计算,直到第 Seq: 46 号 RTP 包才到了刚才 I 帧 PES 包的结尾,紧接着又是一个视频帧,50000 多个字节,直到 Seq: 83 号 RTP 包结尾处才结束。
3,有个补充的地方,见下图,PES 包结束后还有一个 PES 包,是以 00 00 01 bd
起始的,网上说是海康私有数据,似乎暂时没什么用可以忽略。
4,接下来 Seq: 84 号 RTP 包开始了一个 00 00 01 c0
的音频包,连续几个 336 字节。
5,来到第 Seq: 87 号 RTP 包,又看到了 00 00 01 ba
新的 PS 封装包开始了。
6,由此可见,H264 编码的原始流(ES)经过封装为 PES,多个 PES 又组合在一起,封装成适当大小的 PS 包,PS 包经由网络发送时,又被拆分为固定大小的数据块由 RTP 来承载发送,接收端解析操作正好相反。
7,所以看一看到,要是确定一个包里有多少帧,还是要解析到 Payload 的 H264 编码部分才能确定,不然从 RTP 和 PS 层面是没法确定的,I 帧很大需要占用多个 RTP 包来发送,PS 里也可能有多个 P 帧,B帧。
题外话:H264 文件
我对于 H264 编码目前只是简单了解其结构,知道 00 00 00 01
作为分隔,我查看过好几个 H264 文件,有趣的是它们似乎都不太一样。
比如这个:
67 / 27 代表 SPS,68 / 28 代表 PPS,接下来 65 / 25 是一个 I 帧,61 / 21 是 P 帧,后续就是 25,21,21,21,21,25,21,21,21,21,21,21,25,21,21,21,21,25,21,21,21...
循环下去了,所以从 PS 流中把 H264 数据取出,直接写到文件里,看起来就可以了,这种格式的 H264 文件是可以被 FFmpeg 转换为 MP4 视频的。
在动手写之前,我想到最开始了解 SIP 时候看到 Github 上的一个 Python 代码实现,当时看着很头大,有了先行知识,就可以重新来看了。
代码实现
这位老哥在 Github 分享的地址:https://github.com/10961020/GB28181
他实现了发送SIP命令,接收视频流,和控制球机的脚本,可以进行参考。
因为我之前的测试,自己写了模拟的 SIP Server,SIP Client,摄像头直接推流到 Client 的方式测试的,所以我现在能收到 RTP 流,比较关心他的 h264.py 获取流并保存的部分。
十分感谢这位老哥的分享,节省了我不少的时间,感谢,稍微有些瑕疵的地方就是代码似乎不那么 Pythonic,注释也少,我在他代码的基础上改了改,加了注释,代码如下:
#!/bin/env python3
# -*- coding: utf-8 -*-
"""
视频转接核心处理模块包
"""
import os
import shutil
import time
import socket
import bitstring
import random
from pathlib import Path
# UDP接收长度
MAX_RTP_PKT_LEN = 1500
# 视频文件保存的目录
SAVE_DIR = "video"
# 每100个PS包保存为一个文件
PER_PACK_PS_LEN = 100
# 保存全局的序列号
sn = {}
# 用于PS包计数
ji_shu = {}
# 保存RTP包数据
rtp_dict = {} # {序号,数据内容}
# 保存累加的PS包内容
rtp_shipin = {}
# 保存目前文件起始时间
time_ = {}
# 获取RTP扩展头长度
def parse_ext_hdr(bt, lc):
'''
# ++++++++++++++++++++++++++++++++++
# | profile | length |
# ++++++++++++++++++++++++++++++++++
# | header extension |
# | ... |
# ++++++++++++++++++++++++++++++++++
# 扩展头有一个固定的4字节的头部(profile|length)
# 后两个字节保存的是扩展头项32比特的个数,不包括扩展头
# length值为零是有效值,函数返回扩展头字节数
'''
# 跳过两个字节的profile解析出length值
bc = (lc + 2) * 8
length = bt[bc: bc + 16].uint
# 扩展头固定四字节长度加每个扩展项四字节
return 4 + 4 * length
# 从Payload中取H264保存到文件
def parse_frame(pay, name_id):
global ji_shu
global time_
global rtp_shipin
vhome = Path(SAVE_DIR, name_id)
rtp_shipin[name_id] += pay
# 0xba 是PS封装起始标志
if pay[:4] == b'\x00\x00\x01\xba':
ji_shu[name_id] += 1
# 每解析100个PS包切换切换一个临时文件
if ji_shu[name_id] % 100 == 0:
tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
shutil.move(tmpfile, datfile)
time_[name_id] = time.strftime('%Y%m%d%H%M%S')
print("Generate: " + datfile.as_posix())
# 以PS包为单位写入文件
tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
with open(tmpfile, 'a', encoding='latin1') as f:
f.write(rtp_shipin[name_id].decode('latin1'))
rtp_shipin[name_id] = b''
# 提取RTP包中payload
def recv_pkt(data, name_id):
global sn
global rtp_dict
# RTP 数据总长度
data_len = len(data)
# 将数据转换为二进制
bt = bitstring.BitArray(bytes=data)
# 解析数据
# Padding, Extension 标志位
p, x = bt[2], bt[3]
# CC: 4比特 表示固定头部后跟着的CSRC数目
cc = bt[4:8].uint
# SN: 16比特 序列号(Sequence number)
# 每发送一个RTP数据包,序列号增加1 接收端可据此监测丢包和重建包序列
sn_now = bt[16:32].uint
# RTP 头部长度字节数(v|p|x|cc|m|pt|sn|ts|ssrc|csrc)
lc = 12 + 4 * cc if cc else 12
# 如果该位置为1,则该RTP包的尾部就包含附加的填充内容
# 填充内容的最后一个字节存储着填充内容的长度
if p:
data_len -= bt[-8:].uint
# 如果该位置为1,则RTP固定头部后面就跟有一个扩展头部
if x:
lc += parse_ext_hdr(bt, lc)
# Payload
payload = data[lc:data_len]
# PT, Payload Type
# https://en.wikipedia.org/wiki/RTP_payload_formats
# 暂不清楚原作者使用环境,未遇到PT为0x28的情况
if sn[name_id] == -1 and bt[9:16].uint == 40: # 0x28
sn[name_id] = bt[16:32].uint + 1
while rtp_dict[name_id].get(sn[name_id]):
parse_frame(rtp_dict[name_id].get(sn[name_id]), name_id)
rtp_dict[name_id].pop(sn[name_id])
sn[name_id] += 1
elif sn[name_id] == sn_now:
# RTP包顺序正常,与程序记录的一致
parse_frame(payload, name_id)
sn[name_id] += 1
while rtp_dict[name_id].get(sn[name_id]):
parse_frame(rtp_dict[name_id].get(sn[name_id]), name_id)
rtp_dict[name_id].pop(sn[name_id])
sn[name_id] += 1
else:
# sn不同步时重排序防止网络抖动造成的包顺序错误
rtp_dict[name_id][sn_now] = payload
if len(rtp_dict[name_id]) == 10:
lin_shi = sorted(rtp_dict[name_id].keys())
for i in lin_shi:
parse_frame(rtp_dict[name_id][i], name_id)
rtp_dict[name_id].pop(i)
sn[name_id] = i+1
def main(name_id, port):
global sn
global ji_shu
global time_
global rtp_dict
global rtp_shipin
time_[name_id] = time.strftime('%Y%m%d%H%M%S')
# 初始化序列号为-1
sn[name_id] = -1
ji_shu[name_id] = 0
# 用于保存payload数据
rtp_dict[name_id] = {}
rtp_shipin[name_id] = b''
# 创建UDP监听(当前主机IP,Port)
address = ('192.168.104.183', port)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(address)
s.settimeout(20)
# 创建保存本次视频流的目录
vhome = Path(SAVE_DIR, name_id)
if not vhome.exists():
os.makedirs(vhome)
# 用于控制创建首个数据临时文件
first_loop = True
# 通过UDP接收RTP流
while True:
try:
data, addr = s.recvfrom(MAX_RTP_PKT_LEN)
except socket.timeout:
try:
tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
shutil.move(tmpfile, datfile)
except FileNotFoundError:
print("FileNotFoundError")
print(tmpfile.as_posix())
pass
break
if len(data) == 2 and data.decode('latin1') == 'by':
print("h264: Over {},端口: {} ,共 {} 包".format(name_id, port, ji_shu[name_id]))
try:
tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
datfile = Path(vhome, '{}_{}.dat'.format(name_id, time_[name_id]))
shutil.move(tmpfile, datfile)
except FileNotFoundError:
pass
break
# 创建第一个临时文件
if first_loop:
datenow = time.strftime('%Y%m%d%H%M%S')
time_[name_id] = datenow
tmpfile = Path(vhome, '{}_{}.dat.tmp'.format(name_id, time_[name_id]))
Path(tmpfile).touch()
print("Create first tmp file: " + tmpfile.as_posix())
first_loop = False
# 解析数据
recv_pkt(data, name_id)
s.close()
if __name__ == '__main__':
# 本次记录的ID,监听端口号
main('1', 6000)
运行后,通过 SIP Client 给摄像头发送 INVITE 命令成功后,接收的机器要加入到 IPC 的白名单。
就能把海康推送过来的 RTP 流保存到文件了
默认每100个PS包切换一个文件,我算了一下,半个小时的视频文件要1G存储,一天就是40多G,这还是一路...
阅读代码后,我发现并不是我之前查看 H264 文件时想的把 H264 部分取出来存到文件,而是直接把 PS 部分直接存到文件里了,这样生成的文件也是可以使用 FFmpeg 转换为 MP4 的。
H264 文件转 MP4
ffmpeg -i "1_20200326.dat" -c:v copy -f mp4 "out.mp4"
合并多个文件片段
cat *.dat > all.dat
转换成的 MP4 文件可以进行播放。
海康 IP Camera 通过 GB28181 规范获取的 PS 流保存到文件,并转换为 MP4 播放完成,先到这里。