python解析flv协议(AMF数据)
学习python也有1周了,因为之前学习了golang,所以觉得python和golang有很多类似的地方,比如切片和语法,python的确是一门很好的语言,至少你了解了之后才会发现其实它很强大,编写效率很高,至少目前为止我觉得运行效率也很高,之前没学过python2,所以跳过直接python3,既然是学习总得练练手吧,因为前段时间打算做一个抓直播地址的工具,截取网卡数据包的时候,接触过rtmp协议和flv协议,所以打算做一个flv文件解析库,其实也谈不上库,也就是对python如何解析flv协议做一个封装而已,因为这个类Flv在AMF数据的处理上也只处理了AMF0的常用数据,amf元数据在rtmp直播中也常用到,其他的元数据,没有demo我也没法测试
flv协议相信做过直播的童鞋都很清楚,本文也就大概的提一下,因为网上介绍的文章很多,就不详细的介绍了
FLV(Flash Video)是现在非常流行的流媒体格式,由于其视频文件体积轻巧、封装播放简单等特点,使其很适合在网络上进行应用,目前主流的视频网站无一例外地使用了FLV格式。另外由于当前浏览器与Flash Player紧密的结合,使得网页播放FLV视频轻而易举,也是FLV流行的原因之一。
FLV是流媒体封装格式,我们可以将其数据看为二进制字节流。总体上看,FLV包括文件头(File Header)和文件体(File Body)两部分,其中文件体由一系列的Tag及Tag Size对组成。
FLV格式解析
header
头部分由一下几部分组成
Signature(3 Byte)+Version(1 Byte)+Flags(1 Bypte)+DataOffset(4 Byte)
- signature 占3个字节
固定FLV三个字符作为标示。一般发现前三个字符为FLV时就认为他是flv文件。 - Version 占1个字节
标示FLV的版本号。这里我们看到是1 - Flags 占1个字节
内容标示。第0位和第2位,分别表示 video 与 audio 存在的情况.(1表示存在,0表示不存在)。截图看到是0x05
,也就是00000101
,代表既有视频,也有音频。 - DataOffset 4个字节
表示FLV的header长度。这里可以看到固定是9
body
FLV的body部分是由一系列的back-pointers + tag构成
- back-pointers 固定4个字节,表示前一个tag的size。
- tag 分三种类型,video、audio、scripts。
tag组成
tag type
+tag data size
+Timestamp
+TimestampExtended
+stream id
+ tag data
- type 1个字节。8为Audio,9为Video,18为scripts
- tag data size 3个字节。表示tag data的长度。从streamd id 后算起。
- Timestreamp 3个字节。时间戳
- TimestampExtended 1个字节。时间戳扩展字段
- stream id 3个字节。总是0
- tag data 数据部分
其实重点还是是数据部分,flv文件其实是有很多tag组合,常见的tag包含音频,视频,和脚本(元数据),脚本tag就是描述视频或音频的信息的数据,如宽度、高度、时间等等,一个文件中通常只有一个元数据, 音频tag和视频tag就是音视频信息了,采样、声道、频率,编码等信息,这都是基础的知识,关于协议网上很多介绍,下面直接上干货,之后打算学习下python通过ctypes封装调用c方面的知识,谁不想代码跑得快点了
flv.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "aaa@qq.com"
import struct
import traceback
from enum import Enum, unique
"""flv文件解析,元数据解析采用amf0格式"""
@unique
class TagType(Enum):
"""标签类型"""
FLV_TAG_AUDIO = 0x08
FLV_TAG_VIDEO = 0x09
FLV_TAG_SCRIPT = 0x12
@unique
class Amf0DataType(Enum):
"""脚本中的变量类型(本程序支持的元数据类型)"""
FLV_AMF0_NUMBER = 0x00
FLV_AMF0_BOOLEAN = 0x01
FLV_AMF0_STRING = 0x02
FLV_AMF0_OBJECT = 0X03
FLV_AMF0_NULL = 0x05
FLV_AMF0_ARRAY = 0x08
FLV_AMF0_END_OF_OBJECT = 0x09
FLV_AMF0_STRICT_ARRAY = 0X0a
FLV_AMF0_DATE = 0X0b
FLV_AMF0_LONG_STRING = 0X0c
class UnSupportFileFormat(Exception):
pass
class UnSupportAmfValFormat(Exception):
pass
class Tag(object):
"""flv文件头"""
previousTagsSize = 0
type = 0
length = 0
timestamp = 0
exTimestamp = 0
streamsId = 0
# 原始数据
data = []
def parse(self):
"""请子类实现此方法来解析原始数据"""
pass
def __str__(self):
"""like tostring"""
return "%s previousTagsSize:%d type:%d length:%d timestamp:%d exTimestamp:%d streamsId:%d" % (
self.__class__, self.previousTagsSize, self.type, self.length, self.timestamp, self.exTimestamp,
self.streamsId)
def getBytes(self):
"""获取原始字节数据"""
return self.data
# end of class Tag
class AudioTag(Tag):
"""音频tag"""
format = None
samplerate = None
bits = 0
sc = 0
__flag = None
__data = []
def parse(self):
data = super().getBytes()
if len(data) != 0:
self.__flag = data[0]
self.__data = data[1:]
# 前面4位为音频格式
self.format = self.__flag >> 4
# 5 6位是采样率 0000 0011&0010 1011= 0000 0011=3
self.samplerate = (0x03 & self.__flag >> 2)
# 7 位是采样长度 0 8bit 1 16bits
self.bits = (self.__flag >> 1 & 0x01)
# 单声道还是双声道 0单声道 1立体声
self.sc = (self.__flag & 0x01)
return self
def getBytes(self):
"""获取字节数据"""
return self.__data
# end of class AudioTag
class VideoTag(Tag):
"""视频tag"""
frameType = None
codec = None
__flag = None
__data = []
def parse(self):
"""解析视频tag信息"""
data = super().getBytes()
if len(data) != 0:
self.__flag = data[0]
self.__data = data[1:]
# 前4位为帧类型
self.frameType = (self.__flag >> 4)
# 后4位位编码类型(发现python左偏移貌似有些问题,不会自动补位,所以不能用左偏移)
self.codec = (self.__flag & 0x0f)
return self
def getBytes(self):
"""获取字节数据"""
return self.__data
# end of class VideoTag
class ScriptTag(Tag):
"""
脚本数据也称元数据metadata,解析起来稍微有点麻烦
amf0可以查看:
https://wwwimages2.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf
"""
numVal = 0
strVal, lStrVal = "", ""
objVal = []
arrVal = {}
boolVal = False
nullVal, dateVal = None, None
def parse(self):
"""解析脚本元meta数据"""
data = super().getBytes()
size = len(data)
while size > 0:
type = data[0]
data, size = data[1:], size - 1
if type == Amf0DataType.FLV_AMF0_NUMBER:
data, size, self.numVal = self.__parse_number(data, size)
elif type == Amf0DataType.FLV_AMF0_BOOLEAN:
data, size, self.boolVal = self.__parse_boolean(data, size)
elif type == Amf0DataType.FLV_AMF0_STRING:
data, size, self.strVal = self.__parse_string(data, size)
elif type == Amf0DataType.FLV_AMF0_NULL:
data, size, self.nullVal = self.__parse_null(data, size)
elif type == Amf0DataType.FLV_AMF0_OBJECT:
data, size, self.objVal = self.__parse_object(data, size)
elif type == Amf0DataType.FLV_AMF0_DATE:
data, size, self.dateVal = self.__parse_date(data, size)
elif type == Amf0DataType.FLV_AMF0_ARRAY:
data, size, self.arrVal = self.__parse_array(data, size)
elif type == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
data, size, self.arrVal = self.__parse_strict_array(data, size)
elif type == Amf0DataType.FLV_AMF0_LONG_STRING:
data, size, self.lStrVal = self.__parse_long_string(data, size)
else:
raise UnSupportAmfValFormat(type)
# end of while
assert size == 0
return self
def __parse_number(self, data, size):
# 利用struct来处理double
ret = struct.unpack('>d', data[:8])[0]
return data[8:], size - 8, ret
def __parse_boolean(self, data, size):
"""解析boolean值"""
ret = False
if int(data[0]) != 0:
ret = True
return data[1:], size - 1, ret
def __parse_null(self, data, size):
"""解析null值"""
return data[1:], size - 1, None
def __parse_string(self, data, size):
"""解析string值(2字节的长度+N字符串)"""
offset = bytes2int(data[:2])
offset += 2
ret = bytes.decode(data[2:offset])
return data[offset:], size - offset, str(ret)
def __parse_long_string(self, data, size):
"""解析string值(4字节的长度+N字符串)"""
offset = bytes2int(data[:4])
offset += 4
ret = bytes.decode(data[4:offset])
return data[offset:], size - offset, str(ret)
def __parse_date(self, data, size):
"""解析data值(2字节的时区+8字节的时间戳),返回一个dict"""
zone = struct.unpack('>d', data[0:2])[0]
time = struct.unpack('>d', data[2:10])[0]
return data[10:], size - 10, {"zone": zone, "time": time}
def __parse_array(self, data, size):
"""ecma解析,实际是map数据"""
arrLen = bytes2int(data[:4])
arrVal = None
data, size, arrVal = self.__parse_object(data[4:], size - 4)
return data, size, {"len": arrLen, "val": arrVal}
def __parse_strict_array(self, data, size):
"""strict解析array,strict数组是没有key的"""
alen = bytes2int(data[:4])
ret = []
tmp = None
data, size = data[4:], size - 4
while size > 0:
size -= 1
if data[0] == Amf0DataType.FLV_AMF0_END_OF_OBJECT:
data = data[1:]
break
elif data[0] == Amf0DataType.FLV_AMF0_NUMBER:
data, size, tmp = self.__parse_number(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_BOOLEAN:
data, size, tmp = self.__parse_boolean(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_STRING:
data, size, tmp = self.__parse_string(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_NULL:
data, size, tmp = self.__parse_null(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_OBJECT:
data, size, tmp = self.__parse_object(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_DATE:
data, size, tmp = self.__parse_date(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_ARRAY:
data, size, tmp = self.__parse_array(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
data, size, tmp = self.__parse_strict_array(data[1:], size)
ret.append(tmp)
elif data[0] == Amf0DataType.FLV_AMF0_LONG_STRING:
data, size, tmp = self.__parse_long_string(data[1:], size)
ret.append(tmp)
return data, size, ret
def __parse_object(self, data, size):
"""解析object信息,object由一组[key+value],其中value可以是object来嵌套使用"""
ret = dict()
while size > 0:
if data[0] == Amf0DataType.FLV_AMF0_END_OF_OBJECT:
data = data[1:]
size -= 1
break
# 获取key的长度
keyLen = bytes2int(data[:2])
keyLen += 2
keyVal = bytes.decode(data[2:keyLen])
data, size = data[keyLen:], size - keyLen - 1
# 判断object-value类型
if data[0] == Amf0DataType.FLV_AMF0_NUMBER:
data, size, ret[keyVal] = self.__parse_number(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_BOOLEAN:
data, size, ret[keyVal] = self.__parse_boolean(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_STRING:
data, size, ret[keyVal] = self.__parse_string(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_NULL:
data, size, ret[keyVal] = self.__parse_null(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_OBJECT:
data, size, ret[keyVal] = self.__parse_object(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_DATE:
data, size, ret[keyVal] = self.__parse_date(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_ARRAY:
data, size, ret[keyVal] = self.__parse_array(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
data, size, ret[keyVal] = self.__parse_strict_array(data[1:], size)
elif data[0] == Amf0DataType.FLV_AMF0_LONG_STRING:
data, size, ret[keyVal] = self.__parse_long_string(data[1:], size)
return data, size, ret
class OtherTag(Tag):
"""其他标签不予处理"""
def parse(self):
"""获取字节数据,这部分暂不处理"""
return self
def bytes2int(data):
"""字节转换为int"""
return int.from_bytes(data, byteorder="big")
# flv文件头
class Head(object):
signature = None
version = None
flag = None
length = 0
def __init__(self, data):
"""初始化flv文件头信息,一般占用9个字节"""
self.signature = (data[0:3])
self.signature = bytes.decode(self.signature)
if self.signature != "FLV":
raise UnSupportFileFormat("文件格式不被支持")
self.version = data[3]
self.flag = data[4]
self.length = bytes2int(data[5:9])
def has_audio(self):
"""是否有音频"""
return self.flag & 1
def has_video(self):
"""是否有视频"""
return self.flag >> 2
def len(self):
"""对于大于9个字节可能是拓展或其他"""
return self.length
# flv文件体
class Flv(object):
head = None
tags = []
previousTagSize = 0
# 内部缓冲区
__buffer = None
# 加载flv文件
def load(self, filePath, buffSize=2048):
ret = 0
assert filePath != ""
try:
with open(filePath, 'rb') as io:
preTag = None
while 1:
if self.__buffer is not None:
# 当缓冲区达到指定buffer时不再读取文件,先处理缓冲区
buffLen = len(self.__buffer)
if buffLen >= buffSize:
ctx = self.__buffer
else:
ctx = io.read(buffSize)
if len(ctx) != 0:
ctx = self.__buffer + ctx
# print("使用文件IO(%d)" % len(ctx))
else:
ctx = self.__buffer
# print("缓冲区剩余数据处理%d" % len(ctx))
self.__buffer = None
else:
ctx = io.read(buffSize)
size = len(ctx)
if size > 0:
# 处理文件头
if self.head is None:
if size >= 9:
self.head = Head(ctx)
ctx = ctx[self.head.len():]
size -= self.head.len()
else:
self.__buffer = ctx
# 处理标签数据(最后一个循环会遗留4个字节为最后一个tag的大小)
if size >= 4:
# 最后那一个previousTagsSize为4字节
self.previousTagSize = bytes2int(ctx[0:4])
if size >= 15:
if preTag is None:
previousTagType = ctx[4]
if previousTagType == TagType.FLV_TAG_AUDIO:
preTag = AudioTag()
elif previousTagType == TagType.FLV_TAG_VIDEO:
preTag = VideoTag()
elif previousTagType == TagType.FLV_TAG_SCRIPT:
preTag = ScriptTag()
else:
preTag = OtherTag()
# 处理基本信息,最后才处理数据
preTag.previousTagsSize = self.previousTagSize
preTag.type = previousTagType
preTag.length = bytes2int(ctx[5:8])
preTag.timestamp = bytes2int(ctx[8:11])
preTag.exTimestamp = bytes2int(ctx[11:12])
preTag.streamsId = bytes2int(ctx[12:15])
size -= 15
ctx = ctx[15:]
if size > 0:
if size >= preTag.length:
preTag.data = ctx[:preTag.length]
self.__buffer = ctx[preTag.length:]
size -= preTag.length
self.tags.append(preTag.parse())
ret += 1
preTag = None
else:
preTag.data = ctx[:size]
self.__buffer = None
else:
self.__buffer = None
else:
# 补充剩下的数据
calcSize = preTag.length - len(preTag.data)
if size >= calcSize:
preTag.data = preTag.data + ctx[:calcSize]
size -= calcSize
if size > 0:
self.__buffer = ctx[calcSize:]
else:
self.__buffer = None
self.tags.append(preTag.parse())
ret += 1
preTag = None
else:
preTag.data = preTag.data + ctx[:calcSize]
self.__buffer = None
else:
self.__buffer = ctx
else:
break
# end while
except Exception as e:
print("Exception:\n%s\n" % traceback.format_exc())
return ret
# end of class Flv
flv_test.py
import flv
import unittest
class TestFlv(unittest.TestCase):
"""python单元测试"""
def test_load(self):
fl = flv.Flv()
ret = fl.load("20180510-154742.flv")
print("共找到%d个tag" % ret)
if __name__ == "__main__":
"""代替命令行下的 python3 -m unittest flv_test.py"""
unittest.main()
一个256M flv文件还是花费了1秒多
致谢
年纪大了脑袋也不行了,博客密码经常忘了,幸亏有csdn客服帮忙,才帮我找回密码,非常感谢
以下一行文字,纯属广告,不喜直接跳过(有钱的碰个钱场-反正钱是 “马爸爸的”,没钱的碰个人场)
上一篇: 游览器的解析与渲染
下一篇: 洛谷P1140 相似基因