欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python解析flv协议(AMF数据)

程序员文章站 2022-07-13 12:08:09
...

学习python也有1周了,因为之前学习了golang,所以觉得python和golang有很多类似的地方,比如切片和语法,python的确是一门很好的语言,至少你了解了之后才会发现其实它很强大,编写效率很高,至少目前为止我觉得运行效率也很高,之前没学过python2,所以跳过直接python3,既然是学习总得练练手吧,因为前段时间打算做一个抓直播地址的工具,截取网卡数据包的时候,接触过rtmp协议和flv协议,所以打算做一个flv文件解析库,其实也谈不上库,也就是对python如何解析flv协议做一个封装而已,因为这个类Flv在AMF数据的处理上也只处理了AMF0的常用数据,amf元数据在rtmp直播中也常用到,其他的元数据,没有demo我也没法测试

 

flv协议相信做过直播的童鞋都很清楚,本文也就大概的提一下,因为网上介绍的文章很多,就不详细的介绍了

FLV(Flash Video)是现在非常流行的流媒体格式,由于其视频文件体积轻巧、封装播放简单等特点,使其很适合在网络上进行应用,目前主流的视频网站无一例外地使用了FLV格式。另外由于当前浏览器与Flash Player紧密的结合,使得网页播放FLV视频轻而易举,也是FLV流行的原因之一。

FLV是流媒体封装格式,我们可以将其数据看为二进制字节流。总体上看,FLV包括文件头(File Header)和文件体(File Body)两部分,其中文件体由一系列的Tag及Tag Size对组成。

python解析flv协议(AMF数据)

FLV格式解析

header

头部分由一下几部分组成
Signature(3 Byte)+Version(1 Byte)+Flags(1 Bypte)+DataOffset(4 Byte)

  • signature 占3个字节
    固定FLV三个字符作为标示。一般发现前三个字符为FLV时就认为他是flv文件。
  • Version 占1个字节
    标示FLV的版本号。这里我们看到是1
  • Flags 占1个字节
    内容标示。第0位和第2位,分别表示 video 与 audio 存在的情况.(1表示存在,0表示不存在)。截图看到是0x05,也就是00000101,代表既有视频,也有音频。
  • DataOffset 4个字节
    表示FLV的header长度。这里可以看到固定是9

body

FLV的body部分是由一系列的back-pointers + tag构成

  • back-pointers 固定4个字节,表示前一个tag的size。
  • tag 分三种类型,video、audio、scripts。

tag组成

tag type+tag data size+Timestamp+TimestampExtended+stream id+ tag data

  • type 1个字节。8为Audio,9为Video,18为scripts
  • tag data size 3个字节。表示tag data的长度。从streamd id 后算起。
  • Timestreamp 3个字节。时间戳
  • TimestampExtended 1个字节。时间戳扩展字段
  • stream id 3个字节。总是0
  • tag data 数据部分

其实重点还是是数据部分,flv文件其实是有很多tag组合,常见的tag包含音频,视频,和脚本(元数据),脚本tag就是描述视频或音频的信息的数据,如宽度、高度、时间等等,一个文件中通常只有一个元数据, 音频tag和视频tag就是音视频信息了,采样、声道、频率,编码等信息,这都是基础的知识,关于协议网上很多介绍,下面直接上干货,之后打算学习下python通过ctypes封装调用c方面的知识,谁不想代码跑得快点了

flv.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "aaa@qq.com"
import struct
import traceback
from enum import Enum, unique


"""flv文件解析,元数据解析采用amf0格式"""


@unique
class TagType(Enum):
    """标签类型"""
    FLV_TAG_AUDIO = 0x08
    FLV_TAG_VIDEO = 0x09
    FLV_TAG_SCRIPT = 0x12


@unique
class Amf0DataType(Enum):
    """脚本中的变量类型(本程序支持的元数据类型)"""
    FLV_AMF0_NUMBER = 0x00
    FLV_AMF0_BOOLEAN = 0x01
    FLV_AMF0_STRING = 0x02
    FLV_AMF0_OBJECT = 0X03
    FLV_AMF0_NULL = 0x05
    FLV_AMF0_ARRAY = 0x08
    FLV_AMF0_END_OF_OBJECT = 0x09
    FLV_AMF0_STRICT_ARRAY = 0X0a
    FLV_AMF0_DATE = 0X0b
    FLV_AMF0_LONG_STRING = 0X0c


class UnSupportFileFormat(Exception):
    pass


class UnSupportAmfValFormat(Exception):
    pass


class Tag(object):
    """flv文件头"""
    previousTagsSize = 0
    type = 0
    length = 0
    timestamp = 0
    exTimestamp = 0
    streamsId = 0
    # 原始数据
    data = []

    def parse(self):
        """请子类实现此方法来解析原始数据"""
        pass

    def __str__(self):
        """like tostring"""
        return "%s previousTagsSize:%d type:%d length:%d timestamp:%d exTimestamp:%d streamsId:%d" % (
            self.__class__, self.previousTagsSize, self.type, self.length, self.timestamp, self.exTimestamp,
            self.streamsId)

    def getBytes(self):
        """获取原始字节数据"""
        return self.data


# end of class Tag

class AudioTag(Tag):
    """音频tag"""
    format = None
    samplerate = None
    bits = 0
    sc = 0
    __flag = None
    __data = []

    def parse(self):
        data = super().getBytes()
        if len(data) != 0:
            self.__flag = data[0]
            self.__data = data[1:]
            # 前面4位为音频格式
            self.format = self.__flag >> 4
            # 5 6位是采样率 0000 0011&0010 1011= 0000 0011=3
            self.samplerate = (0x03 & self.__flag >> 2)
            # 7 位是采样长度 0 8bit 1 16bits
            self.bits = (self.__flag >> 1 & 0x01)
            # 单声道还是双声道 0单声道 1立体声
            self.sc = (self.__flag & 0x01)
        return self

    def getBytes(self):
        """获取字节数据"""
        return self.__data


# end of class AudioTag
class VideoTag(Tag):
    """视频tag"""
    frameType = None
    codec = None
    __flag = None
    __data = []

    def parse(self):
        """解析视频tag信息"""
        data = super().getBytes()
        if len(data) != 0:
            self.__flag = data[0]
            self.__data = data[1:]
            # 前4位为帧类型
            self.frameType = (self.__flag >> 4)
            # 后4位位编码类型(发现python左偏移貌似有些问题,不会自动补位,所以不能用左偏移)
            self.codec = (self.__flag & 0x0f)
        return self

    def getBytes(self):
        """获取字节数据"""
        return self.__data


# end of class VideoTag
class ScriptTag(Tag):
    """
        脚本数据也称元数据metadata,解析起来稍微有点麻烦
        amf0可以查看:
        https://wwwimages2.adobe.com/content/dam/acom/en/devnet/pdf/amf0-file-format-specification.pdf
    """
    numVal = 0
    strVal, lStrVal = "", ""
    objVal = []
    arrVal = {}
    boolVal = False
    nullVal, dateVal = None, None

    def parse(self):
        """解析脚本元meta数据"""
        data = super().getBytes()
        size = len(data)
        while size > 0:
            type = data[0]
            data, size = data[1:], size - 1
            if type == Amf0DataType.FLV_AMF0_NUMBER:
                data, size, self.numVal = self.__parse_number(data, size)
            elif type == Amf0DataType.FLV_AMF0_BOOLEAN:
                data, size, self.boolVal = self.__parse_boolean(data, size)
            elif type == Amf0DataType.FLV_AMF0_STRING:
                data, size, self.strVal = self.__parse_string(data, size)
            elif type == Amf0DataType.FLV_AMF0_NULL:
                data, size, self.nullVal = self.__parse_null(data, size)
            elif type == Amf0DataType.FLV_AMF0_OBJECT:
                data, size, self.objVal = self.__parse_object(data, size)
            elif type == Amf0DataType.FLV_AMF0_DATE:
                data, size, self.dateVal = self.__parse_date(data, size)
            elif type == Amf0DataType.FLV_AMF0_ARRAY:
                data, size, self.arrVal = self.__parse_array(data, size)
            elif type == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
                data, size, self.arrVal = self.__parse_strict_array(data, size)
            elif type == Amf0DataType.FLV_AMF0_LONG_STRING:
                data, size, self.lStrVal = self.__parse_long_string(data, size)
            else:
                raise UnSupportAmfValFormat(type)
        # end of while
        assert size == 0
        return self

    def __parse_number(self, data, size):
        # 利用struct来处理double
        ret = struct.unpack('>d', data[:8])[0]
        return data[8:], size - 8, ret

    def __parse_boolean(self, data, size):
        """解析boolean值"""
        ret = False
        if int(data[0]) != 0:
            ret = True
        return data[1:], size - 1, ret

    def __parse_null(self, data, size):
        """解析null值"""
        return data[1:], size - 1, None

    def __parse_string(self, data, size):
        """解析string值(2字节的长度+N字符串)"""
        offset = bytes2int(data[:2])
        offset += 2
        ret = bytes.decode(data[2:offset])
        return data[offset:], size - offset, str(ret)

    def __parse_long_string(self, data, size):
        """解析string值(4字节的长度+N字符串)"""
        offset = bytes2int(data[:4])
        offset += 4
        ret = bytes.decode(data[4:offset])
        return data[offset:], size - offset, str(ret)

    def __parse_date(self, data, size):
        """解析data值(2字节的时区+8字节的时间戳),返回一个dict"""
        zone = struct.unpack('>d', data[0:2])[0]
        time = struct.unpack('>d', data[2:10])[0]
        return data[10:], size - 10, {"zone": zone, "time": time}

    def __parse_array(self, data, size):
        """ecma解析,实际是map数据"""
        arrLen = bytes2int(data[:4])
        arrVal = None
        data, size, arrVal = self.__parse_object(data[4:], size - 4)
        return data, size, {"len": arrLen, "val": arrVal}

    def __parse_strict_array(self, data, size):
        """strict解析array,strict数组是没有key的"""
        alen = bytes2int(data[:4])
        ret = []
        tmp = None
        data, size = data[4:], size - 4
        while size > 0:
            size -= 1
            if data[0] == Amf0DataType.FLV_AMF0_END_OF_OBJECT:
                data = data[1:]
                break
            elif data[0] == Amf0DataType.FLV_AMF0_NUMBER:
                data, size, tmp = self.__parse_number(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_BOOLEAN:
                data, size, tmp = self.__parse_boolean(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_STRING:
                data, size, tmp = self.__parse_string(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_NULL:
                data, size, tmp = self.__parse_null(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_OBJECT:
                data, size, tmp = self.__parse_object(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_DATE:
                data, size, tmp = self.__parse_date(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_ARRAY:
                data, size, tmp = self.__parse_array(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
                data, size, tmp = self.__parse_strict_array(data[1:], size)
                ret.append(tmp)
            elif data[0] == Amf0DataType.FLV_AMF0_LONG_STRING:
                data, size, tmp = self.__parse_long_string(data[1:], size)
                ret.append(tmp)
        return data, size, ret

    def __parse_object(self, data, size):
        """解析object信息,object由一组[key+value],其中value可以是object来嵌套使用"""
        ret = dict()
        while size > 0:
            if data[0] == Amf0DataType.FLV_AMF0_END_OF_OBJECT:
                data = data[1:]
                size -= 1
                break
            # 获取key的长度
            keyLen = bytes2int(data[:2])
            keyLen += 2
            keyVal = bytes.decode(data[2:keyLen])
            data, size = data[keyLen:], size - keyLen - 1
            # 判断object-value类型
            if data[0] == Amf0DataType.FLV_AMF0_NUMBER:
                data, size, ret[keyVal] = self.__parse_number(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_BOOLEAN:
                data, size, ret[keyVal] = self.__parse_boolean(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_STRING:
                data, size, ret[keyVal] = self.__parse_string(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_NULL:
                data, size, ret[keyVal] = self.__parse_null(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_OBJECT:
                data, size, ret[keyVal] = self.__parse_object(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_DATE:
                data, size, ret[keyVal] = self.__parse_date(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_ARRAY:
                data, size, ret[keyVal] = self.__parse_array(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_STRICT_ARRAY:
                data, size, ret[keyVal] = self.__parse_strict_array(data[1:], size)
            elif data[0] == Amf0DataType.FLV_AMF0_LONG_STRING:
                data, size, ret[keyVal] = self.__parse_long_string(data[1:], size)
        return data, size, ret


class OtherTag(Tag):
    """其他标签不予处理"""

    def parse(self):
        """获取字节数据,这部分暂不处理"""
        return self


def bytes2int(data):
    """字节转换为int"""
    return int.from_bytes(data, byteorder="big")


# flv文件头
class Head(object):
    signature = None
    version = None
    flag = None
    length = 0

    def __init__(self, data):
        """初始化flv文件头信息,一般占用9个字节"""
        self.signature = (data[0:3])
        self.signature = bytes.decode(self.signature)
        if self.signature != "FLV":
            raise UnSupportFileFormat("文件格式不被支持")
        self.version = data[3]
        self.flag = data[4]
        self.length = bytes2int(data[5:9])

    def has_audio(self):
        """是否有音频"""
        return self.flag & 1

    def has_video(self):
        """是否有视频"""
        return self.flag >> 2

    def len(self):
        """对于大于9个字节可能是拓展或其他"""
        return self.length


# flv文件体
class Flv(object):
    head = None
    tags = []
    previousTagSize = 0
    # 内部缓冲区
    __buffer = None

    # 加载flv文件
    def load(self, filePath, buffSize=2048):
        ret = 0
        assert filePath != ""
        try:
            with open(filePath, 'rb') as io:
                preTag = None
                while 1:
                    if self.__buffer is not None:
                        # 当缓冲区达到指定buffer时不再读取文件,先处理缓冲区
                        buffLen = len(self.__buffer)
                        if buffLen >= buffSize:
                            ctx = self.__buffer
                        else:
                            ctx = io.read(buffSize)
                            if len(ctx) != 0:
                                ctx = self.__buffer + ctx
                                # print("使用文件IO(%d)" % len(ctx))
                            else:
                                ctx = self.__buffer
                                # print("缓冲区剩余数据处理%d" % len(ctx))
                        self.__buffer = None
                    else:
                        ctx = io.read(buffSize)
                    size = len(ctx)
                    if size > 0:
                        # 处理文件头
                        if self.head is None:
                            if size >= 9:
                                self.head = Head(ctx)
                                ctx = ctx[self.head.len():]
                                size -= self.head.len()
                            else:
                                self.__buffer = ctx
                        # 处理标签数据(最后一个循环会遗留4个字节为最后一个tag的大小)
                        if size >= 4:
                            # 最后那一个previousTagsSize为4字节
                            self.previousTagSize = bytes2int(ctx[0:4])
                            if size >= 15:
                                if preTag is None:
                                    previousTagType = ctx[4]
                                    if previousTagType == TagType.FLV_TAG_AUDIO:
                                        preTag = AudioTag()
                                    elif previousTagType == TagType.FLV_TAG_VIDEO:
                                        preTag = VideoTag()
                                    elif previousTagType == TagType.FLV_TAG_SCRIPT:
                                        preTag = ScriptTag()
                                    else:
                                        preTag = OtherTag()
                                    # 处理基本信息,最后才处理数据
                                    preTag.previousTagsSize = self.previousTagSize
                                    preTag.type = previousTagType
                                    preTag.length = bytes2int(ctx[5:8])
                                    preTag.timestamp = bytes2int(ctx[8:11])
                                    preTag.exTimestamp = bytes2int(ctx[11:12])
                                    preTag.streamsId = bytes2int(ctx[12:15])
                                    size -= 15
                                    ctx = ctx[15:]
                                    if size > 0:
                                        if size >= preTag.length:
                                            preTag.data = ctx[:preTag.length]
                                            self.__buffer = ctx[preTag.length:]
                                            size -= preTag.length
                                            self.tags.append(preTag.parse())
                                            ret += 1
                                            preTag = None
                                        else:
                                            preTag.data = ctx[:size]
                                            self.__buffer = None
                                    else:
                                        self.__buffer = None
                                else:
                                    # 补充剩下的数据
                                    calcSize = preTag.length - len(preTag.data)
                                    if size >= calcSize:
                                        preTag.data = preTag.data + ctx[:calcSize]
                                        size -= calcSize
                                        if size > 0:
                                            self.__buffer = ctx[calcSize:]
                                        else:
                                            self.__buffer = None
                                        self.tags.append(preTag.parse())
                                        ret += 1
                                        preTag = None
                                    else:
                                        preTag.data = preTag.data + ctx[:calcSize]
                                        self.__buffer = None
                        else:
                            self.__buffer = ctx
                    else:
                        break
                # end while
        except Exception as e:
            print("Exception:\n%s\n" % traceback.format_exc())
        return ret

# end of class Flv

flv_test.py

import flv
import unittest


class TestFlv(unittest.TestCase):
    """python单元测试"""

    def test_load(self):
        fl = flv.Flv()
        ret = fl.load("20180510-154742.flv")
        print("共找到%d个tag" % ret)


if __name__ == "__main__":
    """代替命令行下的 python3 -m unittest flv_test.py"""
    unittest.main()

一个256M flv文件还是花费了1秒多

python解析flv协议(AMF数据)
测试结果

致谢

年纪大了脑袋也不行了,博客密码经常忘了,幸亏有csdn客服帮忙,才帮我找回密码,非常感谢

以下一行文字,纯属广告,不喜直接跳过(有钱的碰个钱场-反正钱是 “马爸爸的”,没钱的碰个人场)

python解析flv协议(AMF数据)