欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python接口自动化:Python+Django+Unittest+BeautifulReport报告模版

程序员文章站 2022-06-22 23:14:32
目录base目录:find_element.pyparametrizedTestCase.pyrequest_method.pybusiness目录:config目录:PublicConfig.inihandle目录:db_handle.pydb_redis.pycontrol_handle.pyutil目录:public_method.pyread_ini.pygenerate_str.pywholeProcessPython.pycas...

目录

base目录:

find_element.py

parametrizedTestCase.py

request_method.py

business目录:

config目录:

PublicConfig.ini

handle目录:

db_handle.py

db_redis.py

control_handle.py

util目录:

public_method.py

read_ini.py

generate_str.py

wholeProcessPython.py

case目录:

case_steps.py

ziyingCase_steps.py

case用例


Python接口自动化:Python+Django+Unittest+BeautifulReport报告模版

base目录:

base类一般用于存放特别常用的方法,使用时一般都是继承于此类

find_element.py

用于存放元素封装文件

def get_elementByWait(self,key,node,file_name='ElementConfig'):
    read_ini = ReadIni(node,file_name)
    data = read_ini.get_value(key)
    by = data.split('>')[0]
    value = data.split('>')[1]
    try:
        if by == 'id':
            return WebDriverWait(self.driver, 10,0.5).until(EC.visibility_of_element_located((By.ID, value)))
        elif by == 'name':
            return WebDriverWait(self.driver, 10,0.5).until(EC.visibility_of_element_located((By.NAME, value)))
        elif by == 'className':
            return WebDriverWait(self.driver, 10,0.5).until(EC.visibility_of_element_located((By.CLASS_NAME, value)))
        else:
            return WebDriverWait(self.driver, 10,0.5).until(EC.visibility_of_element_located((By.XPATH, value)))
    except:
        # self.driver.save_screenshot("/Users/dabing/PycharmProjects/pub_httprunner/ApiJmeter/Pre_operation_jmeter/report/errorImg/3.jpg")
        return None

def get_configVal(self,key,node,file_name='PublicConfig'):
    read_ini = ReadIni(node,file_name)
    data = read_ini.get_value(key)
    return data

parametrizedTestCase.py

定义类ParametrizedTestCase,使之继承unittest.TestCase并重写其__init__(),增加param这个参数。

class ParametrizedTestCase(unittest.TestCase):
    def __init__(self, methodName='runTest', param=None):
        super(ParametrizedTestCase, self).__init__(methodName)
        self.param = param

    @staticmethod
    def parametrize(testcase_klass, param=None):
        testloader = unittest.TestLoader()
        #获取所有case列表
        testnames = testloader.getTestCaseNames(testcase_klass)
        testnames = list(testnames)
        stop_node = param["hsr_stop_node"]
        start_node = param["hsr_start_node"]
        terminal_logger.info("ParametrizedTestCase类终止节点名称:%s"%stop_node)
        print("ParametrizedTestCase类开始节点名称:%s" % start_node)
        print("ParametrizedTestCase类终止节点名称:%s"%stop_node)
        # #重新生成case列表
        testnames = ParametrizedTestCase.generateNewTestCase(testnames,start_node,stop_node)
        # testnames = ParametrizedTestCase.generateNewTestCase(testnames, stop_node)
        terminal_logger.info("正在执行的case列表名称:%s"%testnames)
        print("正在执行的case列表名称:%s"%testnames)
        suite = unittest.TestSuite()
        for name in testnames:
            suite.addTest(testcase_klass(name, param=param))
        return suite

request_method.py

封装requests请求

class RequestMethod:
   def post_main(self,url,data,cookies=None,headers=None):
      if cookies !=None:
         res = requests.post(url=url,data=data,cookies=cookies,verify=False)
         # print("请求的url:%s"%url+"请求的参数:%s"%str(data))
      elif headers != None:
         res = requests.post(url=url, data=data, headers=headers,verify=False)
      # print("请求的url:%s"%url+"请求的参数:%s"%str(data))
      else:
         res = requests.post(url=url,data=data,verify=False)
         # print("请求的url:%s" % url + "请求的参数:%s" % str(data))
      return res

   def get_main(self,url,data=None,cookies=None):
      if cookies !=None:
         res = requests.get(url=url,data=data,cookies=cookies,verify=False)
         #print("请求的url:%s" % url + "请求的参数:%s" % str(data))
      else:
         res = requests.get(url=url,data=data,verify=False)
         #print("请求的url:%s" % url + "请求的参数:%s" % str(data))
      return res

   def json_main(self,url,json=None,cookies=None,headers=None):
      if cookies !=None:
         res = requests.post(url=url,json=json,cookies=cookies,verify=False)
         # print("请求的url:%s" % url + "请求的参数:%s" % str(json))
      elif headers != None:
         res = requests.post(url=url, json=json, headers=headers,verify=False)
      # print("请求的url:%s"%url+"请求的参数:%s"%str(data))
      else:
         res = requests.post(url=url,json=json,verify=False)
         # print("请求的url:%s" % url + "请求的参数:%s" % str(json))
      return res

   def upload_main(self,url,data=None,files=None,cookies=None):
      if cookies !=None:
         res = requests.post(url=url,data=data,files=files,cookies=cookies,verify=False)
      else:
         res = requests.post(url=url,data=data,files=files,verify=False)
      return res

   def run_main(self,method,url,type,data=None,cookies=None,headers=None):
      requests.packages.urllib3.disable_warnings()#防止警告报错
      if type == "json":
         if method == 'post':
            if headers != None:
               res = (self.post_main(url,data,headers=headers)).json()
            else:
               res = (self.post_main(url, data, cookies=cookies)).json()
         elif method == 'post_json':
            if headers != None:
               res = (self.json_main(url, data, headers=headers)).json()
            else:
               res = (self.json_main(url, data, cookies=cookies)).json()
         else:
            res = (self.get_main(url,data, cookies)).json()
         return json.dumps(res,ensure_ascii=False)
      elif type == "html":
         if method == 'post':
            res = (self.post_main(url, data, cookies)).text
         else:
            res = (self.get_main(url, data, cookies)).text
         return res

business目录:

处理业务逻辑,一般一个类文件是一块业务,每个类文件的构造方法传入类似于cookies,driver等初始化值

 

config目录:

一般存放的是定位元素或配置值

ElementConfig.ini

#公共元素
[PublicElement]
#车务系统登录相关信息
login_account_name=id>loginId
login_account_password=id>password
login_button=className>login-submit
login_success_check=xpath>//*[@id="hsNavBarUser"]/a
#jenkins登录相关信息
login_account_name2=id>j_username
login_account_password2=name>j_password
login_button2=name>Submit
login_success_check2=//*[@id="header"]/div[2]/span/a[1]/b

PublicConfig.ini

#公共参数配置
[PublicParamConfig]
#门店经理角色账户信息
login_name1=adminauto
login_password1=a123456


#接口地址配置1
[ApiAddressConfig]
#登录接口地址
hsr_login_addr=http://test-hsr.huashenghaoche.com
#订单接口地址
hsr_cforder_addr=http://test-cforder.huashenghaoche.com

#数据库连接配置
[DBConnectConfig]
#风控数据库配置
db_antifraud={"host":"test.mysql.proxysql.rw.huashenghaoche.net","port":3306,"user":"antifraud_app","passwd":"W6cpO58l#","db":"antifraud"}
#供应链数据库配置
db_mdm={"host":"test.mysql.proxysql.rw.huashenghaoche.net","port":3306,"user":"wh_app","passwd":"uK0QYUbj#","db":"wh_prod"}

handle目录:

存放数据库操作如mysql、redis和UI 里的行为封装或导航操作

db_handle.py

import pymysql
class DbHandle():
    def __init__(self,host,port,user,passwd,db,isCommit=None):
        if isCommit == None:
            self.cursor = self.getConnect(host, port, user, passwd, db)
        else:
            self.host =host
            self.port = port
            self.user = user
            self.passwd = passwd
            self.db =db

    def getConnect(self,host,port,user,passwd,db):
        conn = pymysql.connect(
            host=host,
            port=port,
            user=user,
            passwd=passwd,
            db=db,
            charset='utf8'
        )
        return conn

    #查询手机号是否存在
    def selByPhone_isExist_sql(self,phone):
        cursor = self.cursor.cursor()
        cursor.execute("select * from user_base where mobile =%s"%phone)
        result = cursor.rowcount
        return result
        # print(qishu_result[0][1])

db_redis.py

class DbRedis():
    def __init__(self, host, port,passwd):
        self.redisObject = self.getConnect(host, port,passwd)
        self.redisObject2 = self.getConnect(host, port, passwd)

    def getConnect(self, host, port,passwd):
        conn = redis.Redis(
            host=host,
            port=port,
            password=passwd,
            decode_responses=True
        )
        return conn
        #天猫写入token
def setToken_sql(self,orderNo,ranLen):
    self.getRandomSet(ranLen)
    self.val = "30000:tmall_token_expire_"
    r = self.redisObject.set(self.val + orderNo + str(self.value_set), orderNo)
    if r:
        ranResult = orderNo + str(self.value_set)
        # print(ranResult)
        return ranResult
    else:
        print("【三方】模拟h5页面补全信审信息写入token信息报错%s" % r)
        return False

control_handle.py

#控件组合的相关操作
class ControlHandle():

    def __init__(self,driver):
        self.driver = driver
        self.fd = FindElement(driver)

    def controlHandle(self,*config_key,select_text=None,default_config_val=None):
        # print(config_key.__len__())
        #input本文控件  实现输入文本操作  入参 1、定位元素的key 2、定位元素的key所在的node节点 3、输入内容的key 4、输入内容的key所在的node节点
        if config_key[0] == "input":
            element_Key = self.fd.get_elementByWait(config_key[1], config_key[2])
            if default_config_val == None:
                config_val = self.fd.get_configVal(config_key[3], config_key[4])
            else:
                config_val = default_config_val
            element_Key.send_keys(config_val)
        # select下拉框  实现下拉框单击操作(控件不是select)  入参 1、定位元素的key 2、定位元素的key所在的node节点 3、定位被选内容元素的key 4、定位被选内容元素的key所在的node节点
        elif config_key[0] == "select_click":
            element_Key = self.fd.get_element(config_key[1], config_key[2])
            element_Key.click()
            element_BySel = self.fd.get_element(config_key[3],config_key[4])
            element_BySel.click()
        #select下拉框  实现下拉框单击操作(通过Select属性选择) 入参 1、定位元素的key 2、定位元素的key所在的node节点 3、select_text为选择的文本内容
        elif config_key[0] == "select_control":
            Select(self.fd.get_elementByWait(config_key[1], config_key[2])).select_by_visible_text(select_text)
        #button 实现提交操作 入参 1、定位元素的key 2、定位元素的key所在的node节点
        elif config_key[0] == "submit":
            element_Key = self.fd.get_element(config_key[1], config_key[2])
            element_Key.click()
        #下拉框 实现鼠标触摸联动被选内容并单击操作
        elif config_key[0] == "move_click":
            element_Key = self.fd.get_element(config_key[1], config_key[2])
            element_Key.click()
            #索引前4个参数分别是开始单击和最后单击选择
            count = 5
            while count < config_key.__len__():
                # print(count)
                move = self.fd.get_element(config_key[count], config_key[count+1])
                count = count+2
                ActionChains(self.driver).move_to_element(move).perform()
                time.sleep(1)
            element_BySel = self.fd.get_element(config_key[3], config_key[4])
            element_BySel.click()

        # 导航相关操作
    def navigationHandle(self, *config_key):
        count = 1
        # 获取参数长度
        paramLen = config_key.__len__()
        # 导航都是点击操作
        if config_key[0] == "continue_click":
            while count < paramLen - 1:
                # print(count)
                if config_key[count + 2] == "iframe_control":
                    time.sleep(6)
                    frame_id = self.fd.get_configVal("nav_createOrderFrameId", "NavigationElement", "ElementConfig")
                    self.driver.switch_to_frame(frame_id)
                    self.fd_frame = FindElement(self.driver)
                    # element_Key = self.fd_frame.get_elementByWait(config_key[count], config_key[count + 1])
                    # element_Key.click()
                    # self.driver.find_element_b
                    # y_id("btn_CREATE").click()
                else:
                    element_Key = self.fd.get_elementByWait(config_key[count], config_key[count + 1])
                    element_Key.click()
                count = count + 2

util目录:

一般存放工具类

public_method.py

一般存放公共方法,如登录和数据库轮训查找功能

class GetLoginCookie():
    def __init__(self):
        self.req = RequestMethod()
        self.fd = FindElement()
        self.hsr_login_addr = self.fd.get_configVal("hsr_login_addr", "ApiAddressConfig")
        self.hsh_cplatformweb_login_addr =self.fd.get_configVal("hsh_cplatformweb_login_addr","ApiAddressConfig")
        self.hsh_jenkins = self.fd.get_configVal("hsh_jenkins","ApiAddressConfig")
        self.hsr_online_login_addr=self.fd.get_configVal("hsr_online_login_addr","ApiAddressConfig")

    def getLoginCookie(self,uname,pwd):
        url = self.hsr_login_addr + "/hshcmdm/toLogin"
        request_data ={
            "loginId":uname,
            "password":pwd
        }
        cookie = self.req.post_main(url, request_data).cookies
        #print("车务登录cookie%s" % cookie.values())
        return cookie
#生产环境_花生业务系统登录
    def loginOnLine(self,uname,pwd):
        url = self.hsr_online_login_addr+'/hshcmdm/toLogin'
        request_data={
            "loginId":uname,
            "password":pwd
        }
        cookie = self.req.post_main(url, request_data).cookies
        # print('生产环境登录cookie',cookie)
        return cookie

#架构系统登录cookie
    def getDingshiLoginCookie(self,uname,pwd):
        url = self.hsh_cplatformweb_login_addr + "/hshcplatformweb/login"
        request_data = {
            "username": uname,
            "password": pwd,
            "validateCode": "PASS",
            "rememberMe": "true"
        }
        cookie = self.req.post_main(url, request_data).cookies
        print("架构登录cookie%s"%cookie.values())
        return cookie
        #循环验证
'''
sqlExec:执行的sql
count:校验次数
msg:描述信息
step:每次循环等待的时间
'''
    def loopVerification(self,**param):
        orderno = param.get("orderno")
        stag = param.get("stag")
        jq_code = param.get("jq_code")
        sy_code = param.get("sy_code")
        sqlFunName = param.get("sqlFunName")
        conn = param.get("con")
        num = param.get("num")
        msg = param.get("msg")
        step = param.get("step")
        phone = param.get("phone")
        #查看sql类里是否有此方法名字
        if hasattr(DbHandle,sqlFunName):
            func = getattr(DbHandle,sqlFunName)
            if sqlFunName == "antifraud_record_isExist_sql" or sqlFunName == "orderCreditAudit_isExist_sql":
                isHas = func(conn,orderno,stag)
                # print(isHas)
                count = 1
                while (isHas != 1 and count <= num):
                    time.sleep(step)
                    print("【轮循执行%s次】%s" % (count,msg))
                    isHas = func(conn,orderno,stag)
                    count = count + 1
                if isHas == 1:
                    # print("【%s】成功!"%msg)
                    return True
                else:
                    print("%s失败!" % msg)
                    return False
            elif sqlFunName == "insurance_record_isExist_sql":
                isHas = func(conn, sy_code, jq_code)
                # print(isHas)
                count = 1
                while (isHas != 1 and count <= num):
                    time.sleep(step)
                    # print("【轮循执行%s次】%s" % (count, msg))
                    isHas = func(conn, sy_code, jq_code)
                    count = count + 1
                if isHas == 1:
                    # print("【%s】成功!"%msg)
                    return True
                else:
                    print("%s失败!" % msg)
                    return False

read_ini.py

读取配置文件

class ReadIni():

    def __init__(self,node=None,file_name=None):
        if file_name == None:
            # file_name = os.path.abspath('../')+"/config/ElementConfig.ini"
            file_name = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + "/Pre_operation_jmeter/config/ElementConfig.ini"
        else:
            # file_name = os.path.abspath('../') + "/config/"+ file_name +".ini"
            file_name = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + "/Pre_operation_jmeter/config/"+ file_name +".ini"
        # print("-------->%s"%os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
        # print("filenam=%s"%file_name)
        if node == None:
            self.node = "CreateClueElement"
        else:
            self.node = node
        self.cf = self.load_ini(file_name)

    #加载文件
    def load_ini(self,file_name):
        cf = configparser.ConfigParser()
        cf.read(file_name,encoding="utf-8")
        return cf

    #获取value的值
    def get_value(self,key):
        data = self.cf.get(self.node, key)
        return data

generate_str.py

字符串处理,生成随机数、身份证号、日期格式化等

 

wholeProcessPython.py

文件 与base、business、case、config、handle、unil目录同级

用于处理前端传来的值(view类里传来的值),并生成报告目录结构、移除报告到历史文件、统计接口执行时间等,调用case目录下测试用例,case用例执行完后在传回此类,将结果在返回给view类里

部分摘要代码:

# 初始化的任务
def init_task(self,**pltToJmeterByParam):
    ########前台返回的操作数据########
    #获取渠道名称
    self.hsr_sel_project = pltToJmeterByParam["hsr_sel_project"]
    #选择开始节点
    self.hsr_start_node = pltToJmeterByParam["hsr_start_node"]
    #选择终止节点
    self.hsr_stop_node = pltToJmeterByParam["hsr_stop_node"]
    #期数
    self.term = pltToJmeterByParam["term"]
    #手机号
    self.phone = pltToJmeterByParam["phone"]
    #订单号
    self.orderNo = pltToJmeterByParam["orderNo"]
    #天猫skuid
    self.tmall_skuid = pltToJmeterByParam["tmall_skuid"]
    #操作人
    self.operator = pltToJmeterByParam["operator"]
    #渠道  -花生APP使用
    self.hsAppChannel = pltToJmeterByParam["hsAppChannel"]
    ########前台返回的操作数据########

    ########初始化配置######## 【配置选择方案、获取手动执行时间】
    if self.hsr_sel_project == "直租-自营":
        ###用于配置选择方案配置###
        self.channel = "自营"
        self.scheme_code = "先用后买-通用方案"
        self.caseObject = ZiyingCase_steps
        # 首付金额
        self.theoreticalDownPaymentAmt = pltToJmeterByParam["theoreticalDownPaymentAmt"]
        ###用于配置选择方案配置###
        ###获取手工执行时间###
        manual_exec_time = self.manual_exec_time_config(self.hsr_sel_project,self.hsr_stop_node)
    elif self.hsr_sel_project == "直租-民生金租":
        ###用于配置选择方案配置###
        self.channel = "民生金租_线上"
        self.scheme_code = "民生金租_线上-通用方案"
        self.caseObject = MsjzCase_steps
        # 首付金额
        self.theoreticalDownPaymentAmt = pltToJmeterByParam["theoreticalDownPaymentAmt"]
        ###用于配置选择方案配置###
        ###获取手工执行时间###
        manual_exec_time = self.manual_exec_time_config(self.hsr_sel_project,self.hsr_stop_node)
        ########报告地址初始化########
    # 新增报告存放的根目录地址
    self.report_base_path = "%s/static/python_report/report/" % self.current_dir_path
    
    # 历史报告存放的根目录地址
    self.history_base_path = "%s/static/python_report/history/" % self.current_dir_path
    
    # 删除report所在目录的所有报告信息 初始化
    self.del_allReportDir(self.report_base_path)
    
    # 创建报告目录
    dateStr = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
    self.terminal_logger.info("\033[1;31;42m{}\033[0m".format('创建正在生成报告的文件目录%s') % dateStr)
    os.mkdir("%s%s" % (self.report_base_path, dateStr))
    self.terminal_logger.info("\033[1;31;42m{}\033[0m".format('开始运行====>全流程脚本'))
    
    # 报告存放地址
    self.report_path = "%s/static/python_report/report/%s" % (self.current_dir_path, dateStr)
    
    self.reportName = "AutomatedReport%s.html" % dateStr
    # 获取读取带报告名称的相对路径地址
    self.re_Testreport_path = "/static/python_report/history/%s/%s" % (dateStr,self.reportName)
    ########报告地址初始化########

case目录:

用于存放case用例

case_steps.py

用于接收wholeProcessPython.py类里传来的值,通过此类来驱动执行指定的case类,然后执行报告

import sys
import os

curPath = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
rootPath = os.path.split(curPath)[0]
PathProject = os.path.split(rootPath)[0]
sys.path.append(rootPath)
sys.path.append(PathProject)

from ApiJmeter.Pre_operation_jmeter.util import HTMLTestReportCN
# from ApiJmeter.Pre_operation_jmeter.util.BeautifulReport import BeautifulReport
from BeautifulReport import BeautifulReport
# from BeautifulReport_bak import BeautifulReport
import logging,threading,random,sys,unittest
from ApiJmeter.Pre_operation_jmeter.util.decorator_function import *
from ApiJmeter.Pre_operation_jmeter.base.parametrizedTestCase import ParametrizedTestCase
from ApiJmeter.Pre_operation_jmeter.case.ziyingCase_steps import ZiyingCase_steps
from ApiJmeter.Pre_operation_jmeter.case.tmallCase_steps import TmallCass_steps
from ApiJmeter.Pre_operation_jmeter.case.twoHandCarCase_steps import TwoCarCase_steps
from ApiJmeter.Pre_operation_jmeter.case.yixinCase_steps import YixinCase_steps
from ApiJmeter.Pre_operation_jmeter.case.rbhnCase_steps import RbhnCase_steps
from ApiJmeter.Pre_operation_jmeter.case.msjzCase_steps import MsjzCase_steps
from ApiJmeter.Pre_operation_jmeter.case.hsAppCase_steps import HsAppCase_steps
from ApiJmeter.Pre_operation_jmeter.util.generate_str import Generate_str
terminal_logger = logging.getLogger('django')


class CaseSteps(ParametrizedTestCase,unittest.TestCase):


    def main(self,**param):
        try:
            # 接收命令行传入的参数
            global tmpVal
            tmpVal = sys.argv[1]
            print("命令参数输出%s"%tmpVal)
            param = eval(tmpVal)
        except Exception as e:
            print(e)


        # 自营、易鑫、天猫渠道
        paramDict = {
            "hsr_sel_project": param["hsr_sel_project"],
            "channel": param["channel"],
            "scheme_code":param["scheme_code"],
            "firstPayment":param["firstPayment"],
            "term":param["term"],
            "tmpFile_path":param["tmpFile_path"],
            "tmpFile_path2": param["tmpFile_path2"],
            "reportName":param["reportName"],
            "report_path":param["report_path"],
            # "caseObject": param["caseObject"],
            "tmall_skuid": param["tmall_skuid"],
            "hsAppChannel": param["hsAppChannel"],
            "operator": param["operator"],
            ######正式######
            "phone": param["phone"],
            "hsr_start_node": param["hsr_start_node"],
            "hsr_stop_node":param["hsr_stop_node"],
            "orderNo": param["orderNo"],
            ######正式######
            ######调试######
            # "phone": "13077159278",
            # "hsr_start_node": "test07_salesSettlementAPI",
            # "hsr_stop_node": "test26_orderCreditAuditAPI",
            # "orderNo": "SO200417100029"
            ######调试######
        }
        terminal_logger.info("所有参数%s" % param)
        terminal_logger.info("报告地址输出%s" % param["report_path"])

        suite = unittest.TestSuite()
        if paramDict["hsr_sel_project"] == "直租-自营":
            suite.addTest(ParametrizedTestCase.parametrize(ZiyingCase_steps, param=paramDict))
        elif paramDict["hsr_sel_project"] == "直租-太保华能":
            suite.addTest(ParametrizedTestCase.parametrize(RbhnCase_steps, param=paramDict))
        elif paramDict["hsr_sel_project"] == "直租-天猫":
            suite.addTest(ParametrizedTestCase.parametrize(TmallCass_steps, param=paramDict))
        elif paramDict["hsr_sel_project"] == "直租-民生金租":
            suite.addTest(ParametrizedTestCase.parametrize(MsjzCase_steps, param=paramDict))
        elif paramDict["hsr_sel_project"] == "二手车-自营":
            suite.addTest(ParametrizedTestCase.parametrize(TwoCarCase_steps, param=paramDict))
        elif paramDict["hsr_sel_project"] == "app-花生好车":
            suite.addTest(ParametrizedTestCase.parametrize(HsAppCase_steps, param=paramDict))
        else:#回租-易鑫
            suite.addTest(ParametrizedTestCase.parametrize(YixinCase_steps, param=paramDict))

        ######新报告生成######
        test_suite = suite
        result = BeautifulReport(test_suite)
        result.report(filename=paramDict["reportName"],description=paramDict["hsr_sel_project"]+"渠道自动化全流程", report_dir=paramDict["report_path"])
        # result.report(filename=paramDict["reportName"], description=paramDict["hsr_sel_project"] + "渠道自动化全流程",
        #               log_path=paramDict["report_path"])
        ######新报告生成######

        ######老报告生成######
        # report_path = paramDict["report_path"]+ "/%s" % paramDict["reportName"]
        # f = open(report_path, 'wb')
        # runner = HTMLTestReportCN.HTMLTestRunner(stream=f, title="自动化报告", description=paramDict["hsr_sel_project"]+"自动化报告")
        # runner.run(suite)
        ######老报告生成######

ziyingCase_steps.py

是case类

初始化driver、cookie、business业务类的实例话,此类需要继承重写的unittest构造方法类ParametrizedTestCase,这样才能将前端收集来的值传递给unittest类里的case

class ZiyingCase_steps(ParametrizedTestCase,unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        global g_orderNo
        global g_pNo
        global g_vin
        global g_fNo
        global g_reportFlag
        g_orderNo = "None"
        g_pNo = "None"
        g_fNo = "None"
        g_vin = "None"
        g_reportFlag = "False"

        terminal_logger.info("进入自营case")
        try:
            # 浏览器启动
            # cls.driver = webdriver.Chrome()
            # 隐藏浏览器启动
            chrome_options = Options()
            chrome_options.add_argument('--no-sandbox')  # 解决DevToolsActivePort文件不存在的报错
            chrome_options.add_argument('window-size=1920x3000')  # 指定浏览器分辨率
            chrome_options.add_argument('--disable-gpu')  # 谷歌文档提到需要加上这个属性来规避bug
            chrome_options.add_argument('--hide-scrollbars')  # 隐藏滚动条, 应对一些特殊页面
            chrome_options.add_argument('blink-settings=imagesEnabled=false')  # 不加载图片, 提升速度
            chrome_options.add_argument('--headless')  # 浏览器不提供可视化页面. linux下如果系统不支持可视化不加这条会启动失败
            cls.driver = webdriver.Chrome(chrome_options=chrome_options)
            cls.driver.set_window_size(1200, 900)

            #初始化车务登录信息
            cls.name = "login_name1"
            cls.pwd = "login_password1"
            cls.fd = FindElement()
            cls.driver.get(cls.fd.get_configVal("hsr_login_addr", "ApiAddressConfig") + "/hshcmdm/login")
            # cls.phone = Generate_str().createPhone()
            cls.idNumber = Generate_str().createIdNumber()
            cls.cookie = GetLoginCookie().getLoginCookie(cls.fd.get_configVal(cls.name, "PublicParamConfig"),cls.fd.get_configVal(cls.pwd, "PublicParamConfig"))
            # 流程节点实例化
            cls.login_process = LoginUIBusiness(cls.driver)
            cls.createClue_process = CreateClueAPIBusiness(cls.cookie,cls.idNumber)
            cls.createOrder_process = CreateOrderAPIBusiness(cls.cookie,cls.idNumber)

用于执行UI中报错页面的截图,通过BeautifulReport包的add_test_img方法反射查找所在case类是否有save_img,所以需要写save_img方法

def save_img(self, img_name):  # 错误截图方法,这个必须先定义好
    """
        传入一个img_name, 并存储到默认的文件路径下
    :param img_name:
    :return:
    """
    #存图片的路径
    self.driver.get_screenshot_as_file("%s/%s.png"%(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) + "/static/python_report/errorImg",img_name))

case用例

1、self.param["phone"]就是继承ParametrizedTestCase类,收集前端的值传来的

2、由于是跑的业务流程的场景,则需要在有产出的case流程节点来存个全局变量以便后面case来使用,如下图test02_createOrderAPI订单case产出物是g_orderNo,设置为全局,可为后面case使用

3、Generate_str().writeFile,由于解决case用例将值反显给前端问题,每个流程的case用例需要写入特定的值到文件里(报告的状态、订单号、车架号需要展示的信息),来随时保存值的状态

# 【线索】新建线索流程
def test01_createClueAPI(self):
    """【线索】新建线索"""
    global g_reportFlag
    g_reportFlag = self.createClue_process.newleadsCreate(self.param["phone"],self.param["operator"])
    # 将订单信息写入文件
    Generate_str().writeFile(self.param["tmpFile_path"], phone=self.param["phone"], reportFlag=str(g_reportFlag))
    # print("11111111111111a%s"%g_reportFlag)
    self.assertTrue(bool(g_reportFlag), "【订单】新建线索流程失败!")

# 【订单】新建订单流程
def test02_createOrderAPI(self):
    """【订单】新建订单"""
    global g_reportFlag
    if g_reportFlag:
        global g_orderNo
        rela, g_reportFlag = self.createOrder_process.saveAndSubmit(self.param["phone"])
        g_orderNo = rela["order_no"]
        # print("获取订单号:%s"%g_orderNo)
    # 将订单信息写入文件
    Generate_str().writeFile(self.param["tmpFile_path"], sNo=g_orderNo, phone=self.param["phone"],reportFlag=str(g_reportFlag))
    self.assertTrue(bool(g_reportFlag), "【订单】新建订单流程失败!")

# 【订单】登录流程
@BeautifulReport.add_test_img('loginUIError%s' % time.strftime("%Y%m%d%H%M%S", time.localtime()))
def test03_loginUI(self):
    """【订单】订单登录"""
    global g_reportFlag
    global g_orderNo
    if g_reportFlag:
        g_reportFlag = self.login_process.loginUI(self.name, self.pwd)
        # if g_reportFlag:
        #     self.save_img('测试报告')
        if self.param["orderNo"] != "":
            g_orderNo = self.param["orderNo"]
    # 将订单信息写入文件
    Generate_str().writeFile(self.param["tmpFile_path"], sNo=g_orderNo, phone=self.param["phone"],reportFlag=str(g_reportFlag))
    self.assertTrue(bool(g_reportFlag), "【订单】登录流程失败!")

 

本文地址:https://blog.csdn.net/u011417723/article/details/107889705

相关标签: python 自动化