欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

pytest框架进行接口自动化

程序员文章站 2022-03-07 20:04:26
...

pytest框架进行接口自动化

1、框架目录结构介绍

目录结构图

1、公共目录(主要放请求发送工具文件,读取yaml文件工具等)
    请求发送工具文件:send_request_util.py
    读取yaml工具文件: yaml_util.py
2、报告目录(用来作为报告保存的目录)
	reports
3、用例目录(用来存放用例的目录)
	testcase
4、全局文件 runAll (用于执行全局用例)
    runAll.py :该文件中只有一个方法:pytest.main()
5、pytest.ini 文件(用于控制用例执行)
    文件开头必须是  [pytest]
    参数:
        addopts (相当于pytest.main()方法中的参数)
        testpath (执行配置路径下的testcase, 从runAll文件路径开始,多个路径用空格分隔)
        python_files (配置执行文件的文件名,如 test_*.py,执行test_开头的Python文件)
        pyhton_classes (配置执行文件中的类名,如 Test*,执行Test开头的类)
        python_functions (配置执行的方法名,如 test_*,执行所有test_开头的方法)
[pytest]
addopts = -vs
;--html = ./reports/report.html
testpaths = ./testcase
python_files = test*.py
python_classes = Test*
python_functions = test_*
6、全局固件文件
    固件执行文件,固定命名  conftest.py
7、获取可变参数方法文件
    CanShu.py
8、extract.yaml  接口关联数据文件
9、全局配置文件 config.yaml
10、封装公共日志文件方法 log_util.py

2、未封装的基础用例编写

1、按照规范命名文件名、类名、方法名
文件名 test*.py  类名 Test*  方法名  test_*
2、基础请求编写
    url  : 发送请求的url ,每个请求必填
    method :发送请求的方法,每个请求必填
    headers :请求头,看接口要求填写,非必填
    data /json /parmas /files : 请求参数,看接口要求填写,非必填,get 请求用 parmas 传参,post使用data/json/files传参,其中文件上传用files

实例:

  # 获取微信登录token
  def test_get_produce_token(self):
       url = 'https://api.weixin.qq.com/cgi-bin/token'
       datas = {
            "grant_type": "client_credential",
            "appid": "wx89b815xxxxxxxx",
            "secret": "6bab38fbf47a4f004f8xxxxxxxxx"
            }
        res = requests.request(method='get', url=url, params=datas)

注意:

保存token使用正则匹配,需要注意,先将需要匹配的字符串打印出来,不是res.json(),需要使用res.text ,再从打印信息中复制字符串进行匹配,因为正则是匹配的字符串
正则表达式: re.search("(.*?)",str).group(1),如果没有提取到值会报没有group属性的错误
3、将提取的token写到公共文件方法:
   1、在根目录下创建一个extract.yaml 文件,用来存放接口关联的数据
   2、在common中 创建一个 yaml_util.py文件,用来写yaml文件的读取写入清除等方法,可以不用写类名  
   3、yam_util.py中获取执行文件路径的方法,读取文件路径都可调用该方法
# 获取根目录
def get_object_path():
    return os.getcwd()
4、数据写入yaml文件方法
def write_extract_yaml(data):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='a', encoding='utf-8') as f:
        # yaml文件写入,写入数据data,写入流 stream,是否允许unicode编码
        yaml.dump(data=data, stream=f, allow_unicode=True)
5、 读取数据方法,直接传key即可
# 读取数据方法,直接传key即可
def read_extract_yaml(key):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='r', encoding='utf-8') as f:
        # 在yaml文件中读取值,获取yaml文件中所有的数据
        value = yaml.load(f, Loader=yaml.FullLoader)
        # 需要根据传入的key获取数据
        return value[key]
6、每次执行都会写入token,需要清除token
在conftest.py文件中使用固件,调用清除extract_yaml文件方法,在下一次开始执获取session前,先清除token
# 使用装饰器,调用固件方法,作用范围为session,自动执行
@pytest.fixture(scope="session", autouse=True)
def clear_extract():
    clear_extract_yaml()
7、清除关联文件数据方法
# 清除yaml文件数据
def clear_extract_yaml():
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + "/extract.yaml", mode='w', encoding='utf-8') as f:
        f.truncate()

3、发送请求的封装

1、创建send_request_util.py公共工具文件
# 在公共方法中创建session,使用公共方法中分session发送请求
session = requests.session()

# 发送请求方法,发送请求必须要的参数有 url 和 method,其它参数可以由可变长字典参数传值
def send_request(slef, url, method, **kwargs):
    # 请求方式大小写需要转换一下,避免出现由于大小写引起的发送失败的情况
    method = str(method).lower()
    # 直接发送请求,并且返回请求响应数据
    return RequestUtil.session.request(method, url=url, **kwargs)

2、配置全局配置,将部分不变的请求路径配置到yaml文件中
    1、在根目录下创建config.yaml文件,配置如下,按照规则自定义配置即可:
            base_sp_url: https://api.weixin.qq.com
            base_yh_url: http://47.107.116.139
    2、在yaml_util.py中编写读取路径的方法
# 读取config_yaml中的配置路径
def read_config_yaml(node):
    with open(get_object_path() + "/config.yaml", encoding='utf-8') as f:
        # 读取所有的数据
        value = yaml.load(f, Loader=yaml.FullLoader)
        # 返回传入key的值
        return value[node]

4、封装请求参数

1、将用例请求参数写入yaml文件中
-
  name: 获取验证码
  request:
    url: /cgi-bin/token
    method: get
    headres: none
    params:
      grant_type: client_credential
      appid: wx89b81545xxxx
      secret: 6bab38fbf47a4f004f8axxxxx
  extract: none
  volidate: none
2、在yaml_util中写一个公共方法,读取用例
# 读取用例的方法
def read_testcase_yaml(path):
    # 打开文件,模式为写入追加写入,编码格式为utf-8
    with open(get_object_path() + path, mode='r', encoding='utf-8') as f:
        # 在yaml文件中读取值,获取yaml文件中所有的数据并返回
        return yaml.load(f, Loader=yaml.FullLoader)
3、在send_request_util中写一个公共方法,校验yaml文件中的用例参数是否正确
# 校验用例是否正确,case_info信息通过装饰器,调用yaml_util.py中的read_testcase_yaml方法即可得到
def stander_yaml(self, case_info):
    # 获取列表所有的key
    case_info_keys = case_info.keys()
    # 首先判断caseinfo中是否包含必填的字段
    if "name" in case_info_keys and "request" in case_info_keys and "extract" in case_info_keys and "validate" in case_info_keys:
        # 获取request中所有的key
        request_keys = case_info["request"].keys()
        # 判断request中是否包含url和method
        if "url" in request_keys and "method" in request_keys:
            print("通过yaml用例标准化校验")
            # 获取url 和method
            url = case_info["request"].pop("url")
            method = case_info["request"].pop("method")
            # 调用发送请求方法
            res = self.send_request(url=url, method=method, **case_info["request"])
            # 获取返回状态码,后续做断言传参
            status_code = res.status_code
        else:
            print("二级关键词必须包含url和method")
    else:
        print("一级关键词必须要包含name、request、method、validate")

5、可变参数的封装处理

1、在send_request_util实现替换方法,对于data、json、file、head 、params中的可变参数,进行替换
规则:需要替换的部分通常以 ${参数名} 的形式书写,规则自己定,替换方式通过热加载方式(通过反射调用方法和参数,在执行过程中间,将实际的值传递进去)
# 替换可变参数的方法
def replace_value(self, data):
    # 判断data是否存在,存在则进行替换操作
    if data:
        # 保留data类型
        data_type = type(data)
        # 判断data类型,如果是list 或者 dict 类型的,使用json方法转换为字符串
        if isinstance(data, list) or isinstance(data, dict):
            str_data = json.dumps(data)
        # 如果是其它类型,直接强转为字符串
        else:
            str_data = str(data)
        # 对字符串进行循环,如果存在${},则开始进行替换,循环替换掉所有的参数
        for cs in range(1, str_data.count("${") + 1):
            if "${" in str_data and "}" in str_data:
                # 获取每一次替换的开始索引和结束索引,取出的数据为 ${参数}
                start_index = str_data.index("${")
                end_index = str_data.index("}", start_index)
                # 获取老数据,即从yaml文件中获取的数据
                old_value = str_data[start_index:end_index + 1]
                # 获取方法名,提供给获取新数据时反射使用
                fun_name = old_value[2:old_value.index("(")]
                # 获取方法中的参数
                arg_value = old_value[old_value.index("(") + 1:old_value.index(")")]
                # 如果参数不为空
                if arg_value != "":
                    arg_values = arg_value.split(",")
                    # 使用反射获取新的值
                    new_value = getattr(self.obj, fun_name)(*arg_value)
                else:
                    # 使用反射获取新的值,没有参数也需要带一个小括号
                    new_value = getattr(self.obj, fun_name)()
                # 替换参数
                str_data = str_data.replace(old_value, new_value)
        # 还原数据类型,如果数据是字典或者列表,则还原为json,否则还原为原类型
        if isinstance(data_type, dict) or isinstance(data_type, list):
            str_data = json.loads(str_data)
        else:
            str_data = data_type(str_data)
    return str_data
2、需要传一个对象到send_request_util.py方法中
在根目录创建一个参数化文件CanShu.py,在里面写参数化获取方法
class CanShu:

    # 获取随机整数的方法
    def get_randomInt(self, start_num, end_num):
        return str(random.randint(int(start_num), int(end_num)))

    # 读取extract中的access_token值
    def read_extract_data(self, access_token):
        return read_extract_yaml(access_token)
3、在发送请求方法里面调用替换参数的方法
# 判断可变长字典的key里面有没有,data params json headers,如果有,则调用替换方法
for key, value in kwargs.items():
    if key in ["params", "data", "json", "headers"]:
        kwargs[key] = slef.replace_value(value)
    # 如果是文件类型,则对应的值替换成open方法
    elif key == "files":
        for file_key, file_value in value.items():
            value[file_key] = open(file_value, 'rb')

6、提取参数写入指定文件

1、一般在用例yaml文件中的书写格式
2、判断返回响应数据是否json串,以此决定是否以正则提取还是jsonpath提取
定义res_json
变量,用来接收返回数据
res.json()
res_json = ""
# 判断返回是否json
try:
    res_json = res.json()
except Exception as e:
    print("返回的结果不是JSON格式,不能使用jsonpath提取")
# 将需要保存的值保存到exteact.yaml中
# 如果用例中存在exteact关键字
if "extract" in case_info_keys:
    # 循环查找有多少需要写入的数据
    for key, value in case_info["extract"].items():
        # 正则匹配提取
        if '(.*?)' in value or '(.+?)' in value:
            zz_value = re.search(value, res.text)
            # 判断是否提取到数据
            if zz_value:
                extract_value = {key: zz_value.group(1)}
                # 写入数据
                write_extract_yaml(extract_value)
        # jsonpath提取
        else:
            js_value = {key: jsonpath.jsonpath(res_json, value)}
            # 写入数据
            write_extract_yaml(js_value)

7、封装断言方法

1、将断言写入用例yaml文件,格式为
validate:
#状态断言相等
  - equals: {status_code: 200} 
#   业务断言相等   
  - equals: {expires_in: 7200}   
#  业务断言包含
  - contains: access_token  
2、将断言代码写到公共文件send_request_util.py stander_yaml规范测试用例方法中,提取关联参数代码如下
# 断言代码,获取用例中的断言数据,获取数据是一组嵌套字典
yl_result = case_info["validate"]
# 调用断言方法,断言方法单独写
self.assert_result(yl_result, res_json, status_code)


# 断言方法
def assert_result(self, yl_result, res_json, status_code):
    all_flag = 0
    # 循环用例中的断言方式
    for yl_key, yl_value in yl_result.items():
        # 如果key 等于equals,则调用equals断言方法
        if yl_key == "equals":
            flag = self.assert_equals(yl_value, res_json, status_code)
            all_flag = all_flag + flag
        # 如果可以是contains,调用包含的断言方法
        elif yl_key == "contains":
            flag = self.assert_contains(yl_value, res_json)
            all_flag = all_flag + flag
            # 其它情况暂时不写,其它断言方式可以在下面继续添加
        else:
            print("框架暂不支持此断言")
    assert all_flag == 0


# 断言相等的方法
def assert_equals(self, yl_value, res_json, status_code):
    flag = 0
    # 循环yl_value,查看里面所有的 data数据
    for assert_key, assert_vaule in yl_value.items():
        # 判断是否存在 status_code 状态断言
        if "status_code" == assert_key:
            # 如果断言的值不相等,则断言失败
            if assert_vaule != status_code:
                flag += 1
                print("断言失败:状态码的值不等于:%s" % assert_vaule)
        # 如果是其它情况
        else:
            # 通过jsonpath提取所有的key值(如果不是json格式的则需要先转换为json)'$..%s' % assert_key写法,
            # 就是从res_json中提取所有相等的 assert_key,返回的结果为一个list集合
            lists = jsonpath.jsonpath(res_json, '$..%s' % assert_key)
            # 判断lists是否有提取到值
            if lists:
                # 如果assert_vaule不在lists中,则断言失败
                if assert_vaule not in lists:
                    flag += 1
                    print("断言失败,断言的" + assert_key + "不等于" + str(assert_vaule))
            else:
                print("断言失败:返回的结果中不存在:" + assert_key)
                flag += 1
        return flag


# 断言包含的方法
def assert_contains(self, yl_value, res_json):
    flag = 0
    if yl_value not in str(res_json):
        print("断言失败,返回结果中不包含:" + yl_value)
        flag += 1
    return flag

8、打印日志

# import logging
# import time
#
# from common.yaml_util import get_object_path, read_config_yaml
#
#
# class LoggerUtil:
#     # 创建日志
#     def create_log(self, logger_name='log'):
#         # 创建日志对象
#         self.logger = logging.getLogger(logger_name)
#         # 设置全局的日志级别,从低到高  debug/info/warn/error/critical
#         self.logger.setLevel(logging.DEBUG)
#
#         # 判断日志对象下的控制器,不存在控制器就添加控制器
#         if not self.logger.handlers:
#             # 文件日志
#             # 创建文件日志,创建文件日志控制器
#             self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
#                 int(time.time())) + ".log"
#             # 创建文件日志的控制器
#             self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
#             # 获取配置的文件日志级别
#             file_log_level = str(read_config_yaml("log", "log_level")).lower()
#             # 设置文件的日志级别
#             if file_log_level == "debug":
#                 self.file_hander.setLevel(logging.DEBUG)
#             elif file_log_level == "info":
#                 self.file_hander.setLevel(logging.INFO)
#             elif file_log_level == "warning":
#                 self.file_hander.setLevel(logging.WARNING)
#             elif file_log_level == "error":
#                 self.file_hander.setLevel(logging.ERROR)
#             elif file_log_level == "critical":
#                 self.file_hander.setLevel(logging.CRITICAL)
#             else:
#                 self.file_hander.setLevel(logging.DEBUG)
#             # 创建文件日志的格式
#             self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
#             # 将文件日志的控制器加入到日志对象
#             self.logger.addHandler(self.file_hander)
#
#             # 控制台日志
#             # 1、创建控制器日志的控制器
#             self.console_hander = logging.StreamHandler()
#             # 2、设置控制台的日志级别
#             console_log_level = str(read_config_yaml("log", "log_level")).lower()
#             # 设置文件的日志级别
#             if console_log_level == "debug":
#                 self.console_hander.setLevel(logging.DEBUG)
#             elif console_log_level == "info":
#                 self.console_hander.setLevel(logging.INFO)
#             elif console_log_level == "warning":
#                 self.console_hander.setLevel(logging.WARNING)
#             elif console_log_level == "error":
#                 self.console_hander.setLevel(logging.ERROR)
#             elif console_log_level == "critical":
#                 self.console_hander.setLevel(logging.CRITICAL)
#             else:
#                 self.console_hander.setLevel(logging.DEBUG)
#             # 创建控制台日志的格式
#             self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
#             # 将控制台日志的控制器加入到日志对象
#             self.logger.addHandler(self.console_hander)
#
#
# # 错误日志输出
# def error_log(message):
#     LoggerUtil().create_log().error(message)
#
#
# # 信息日志输出
# def logs(message):
#     LoggerUtil().create_log().info(message)

#
import logging
import time

from common.yaml_util import get_object_path, read_config_yaml


class LoggerUtil:

    def create_log(self, logger_name='log'):
        # 创建一个日志对象
        self.logger = logging.getLogger(logger_name)
        # 设置全局的日志级别(从低到高:debug调试<info信息<warning警告<error错误<critical严重)
        self.logger.setLevel(logging.DEBUG)

        # 去除重复的日志
        if not self.logger.handlers:
            # ----------文件日志----------
            # times = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
            # print("============")
            # print(str(times))
            # 1.创建文件日志路径
            self.file_log_path = get_object_path() + "/logs/" + read_config_yaml("log", "log_name") + str(
                int(time.time())) + ".log"
            # 2.创建文件日志的控制器
            self.file_hander = logging.FileHandler(self.file_log_path, encoding='utf-8')
            # 3.设置文件日志的日志级别
            file_log_level = str(read_config_yaml("log", "log_level")).lower()
            if file_log_level == "debug":
                self.file_hander.setLevel(logging.DEBUG)
            elif file_log_level == "info":
                self.file_hander.setLevel(logging.INFO)
            elif file_log_level == "warning":
                self.file_hander.setLevel(logging.WARNING)
            elif file_log_level == "error":
                self.file_hander.setLevel(logging.ERROR)
            elif file_log_level == "critical":
                self.file_hander.setLevel(logging.CRITICAL)
            else:
                self.file_hander.setLevel(logging.DEBUG)
            # 4.创建文件日志的格式
            self.file_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
            # 将文件日志的控制器加入到日志对象
            self.logger.addHandler(self.file_hander)

            # ----------控制台日志----------
            # 1.创建控制台日志的控制器
            self.console_hander = logging.StreamHandler()
            # 2.设置控制台日志的日志级别
            console_log_level = str(read_config_yaml("log", "log_level")).lower()
            if console_log_level == "debug":
                self.console_hander.setLevel(logging.DEBUG)
            elif console_log_level == "info":
                self.console_hander.setLevel(logging.INFO)
            elif console_log_level == "warning":
                self.console_hander.setLevel(logging.WARNING)
            elif console_log_level == "error":
                self.console_hander.setLevel(logging.ERROR)
            elif console_log_level == "critical":
                self.console_hander.setLevel(logging.CRITICAL)
            else:
                self.console_hander.setLevel(logging.DEBUG)
            # 3.创建控制台日志的格式
            self.console_hander.setFormatter(logging.Formatter(read_config_yaml("log", "log_format")))
            # 将控制台日志的控制器加入到日志对象
            self.logger.addHandler(self.console_hander)

        # 返回包含有文件日志控制器和控制台日志控制器的日志对象
        return self.logger


# 错误日志的输出
def error_log(message):
    LoggerUtil().create_log().error(message)
    # raise Exception(message)


# 信息日志的输出
def logs(message):
    LoggerUtil().create_log().info(message)




9、sign签名

   # 通过公钥加密
    def public_key_jiami(self, args):
        # 导入公钥
        with open("public.pem") as f:
            pubkey = rsa.PublicKey.load_pkcs1(f.read().encode())
        # 加密
        byte_str = rsa.encrypt(str(args).encode("utf-8"), pubkey)
        # 把二进制转换成字符串格式
        miwen = base64.b64encode(byte_str).decode("utf-8")
        return miwen
    def signs(self, yaml_path):
        last_url = ""
        last_data = {}
        with open(os.getcwd() + yaml_path, encoding='utf-8') as f:
            yaml_value = yaml.load(f, Loader=yaml.FullLoader)
            for caseinfo in yaml_value:
                caseinfo_keys = caseinfo.keys()
                # 判断一级关键字是否包括有:name,request,valiedate
                if "request" in caseinfo_keys:
                    # 判断url
                    if "url" in caseinfo['request'].keys():
                        last_url = caseinfo['request']['url']
                    # 判断参数
                    req = caseinfo['request']
                    for key, value in req.items():
                        if key in ['params', 'data', 'json']:
                            for p_key, p_value in req[key].items():
                                last_data[p_key] = p_value
        last_url = last_url[last_url.index("?") + 1:len(last_url)]
        # 把last_url的字符串格式加到last_data字典
        lis = last_url.split("&")
        for a in lis:
            last_data[a[0:a.index("=")]] = a[a.index("=") + 1:len(a)]
        print(last_data)
        # 热加载替换
        last_data = RequestUtil(self,"base","base_sp_url").replace_value(last_data)
        print(last_data)
        # 字典根据key的asccii码排序
        new_dict = self.dict_asscii_sort(last_data)
        print(new_dict)
        # 第二步:(2)把参数名和参数的值用=连接成字符串,多个参数之间用&连接。a=2&b=1&c=3
        new_str = ""
        for key, value in new_dict.items():
            new_str = new_str + key + "=" + value + "&"
        print(new_str)
        # 第三到第五步
        appid = "wx8a9de038e93f77ab"
        appsecret = "8326fc915928dee3165720c910effb86"
        nonce = str(random.randint(1000000000, 9999999999))
        timestamp = str(time.time())
        all_str = appid + appsecret + new_str + nonce + timestamp
        # 第六步
        sign_str = self.md5(all_str)
        return sign_str

    def dict_asscii_sort(self, dict_str):
        dict_key = dict(dict_str).keys()
        l = list(dict_key)
        l.sort()
        new_dict = {}
        for key in l:
            new_dict[key] = dict_str[key]
        return new_dict

10、封装后的用例

class Test_produce:

    # 获取微信登录token
    @pytest.mark.run(order=1)
    @pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/produce.yaml"))
    def test_get_produce_token(self, caseinfo):
        RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)

    # # 创建标签接口
    @pytest.mark.parametrize("caseinfo", read_testcase_yaml("/testcase/test_produce/add_tag.yaml"))
    def test_addflag(self, caseinfo):
        res = RequestUtil("base_sp_url", CanShu()).stander_yaml(caseinfo)