欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

接口平台介绍

程序员文章站 2022-03-16 14:40:09
...

平台功能

1、组件管理
组件是编写case的最基本单位,一个组件的调用是case中的一个步骤
组件和程序语言中的函数类似,可以有入参,也可以有出参
组件分系统组件和自定义组件
2、用例管理
3、任务管理
4、域名管理

特点

1、变量
定义变量:
用来存储运行过程中产生的数据,以便后续步骤中调用
接口响应中变量的值通过jsonpath定位
使用变量:
使用{value}的方式使用
2、项目准入
和jira编号关联,关联项目提测时会执行项目准入case,用于统计新增代码覆盖率
3、工程准入
指服务提测时必须执行通过的case
4、动态参数,生成时间
当前时间戳:${T}
当前时间戳,s级别 : T,s{T, s} 指定当前时间格式:{T, %Y-%m-%d %H%M%S}
当前时间往前推25小时 : KaTeX parse error: Expected '}', got 'EOF' at end of input: … %H%M%S} 当天20点:{T:20:00:00}
指定时间:T:2019061121:05:51,{T:2019-06-11 21:05:51,%Y-%m-%d %H%M%S } 用变量指定时间:{T:{date_time}, %Y-%m-%d %H%M%S
}
5、动态参数,支持长度计算
新增一种长度计算的动态参数,用于计算变量的长度。
这个参数可用于:1、数据量对比,2、校验是否存在。
格式定义:L:varname[vartype]{L:{var_name},[var_type]},如:{L:{insert_data},list},${L:{insert_data},dict}等。
L表示长度动态参数,{var_name}是现有的变量定义格式,var_type为可选参数,指定{var_name}变量的类型。
返回值:整数(正常计算)或报错(空/指定变量类型不符)。
6、动态参数,支持随机数
7、定时任务
8、支持无人机
9、复制case
10、支持冒烟

项目框架

前端vue,后端django、使用pymysql操作数据库

后端代码结构

/utils 存放公共方法,request封装、连接操作数据库的方法、加载配置文件、加解密方法、通过jsonpath获取响应值中key对应的value、、获取token、日志、公共时间方法封装、断言方法等等
/components 存放各种组件
/services 存放组装case的方法、运行case的方法、运行任务的方法等等
/config 存放配置文件
/app 存放视图函数、路由配置等等

有哪些可以改进的地方?

1、缓存账号topic
2、

连接数据库

class DB(object):
    _pool = None

    @staticmethod
    def init():
        db = DB()
        return db

    def __init__(self, env=''):
        if env == '':
            db_info = load_config(ENV_CONFIG_PATH)['db']
        else:
            db_info = load_config(ENV_CONFIG_PATH)[env]
        # 数据库初始化
        self.host = db_info['host']
        self.port = int(db_info['port'])
        self.user = db_info['user']
        self.passwd = db_info['passwd']
        self.database = db_info['database']
        self.con = None
        self.active = False

    def _start(self):
        if not self.active:
            self.connect()

    def connect(self):
        # 连接数据库
        try:
            connection = pymysql.connect(
                host=self.host,
                port=int(self.port),
                user=self.user,
                passwd=self.passwd,
                database=self.database,
                cursorclass=pymysql.cursors.DictCursor,
                charset='utf8',
                use_unicode=True)
            log.info('{0} :database is connecting successful'.format(
                self.host))
            self.con = connection
        except Exception as e:
            log.error('{0} :database is connecting failed : {1}'.format(
                self.host, e))

    def query_one(self, sql):
        # 查询一个
        self._start()
        cursors = self.con.cursor()
        log.info('execute query one sql: {0}'.format(sql))

        try:
            cursors.execute(sql)
            result = cursors.fetchone()
            return result

        except Exception as e:
            log.error('query is error: {0}'.format(e))
            return None

    def query_all(self, sql):
        # 查询所有
        self._start()
        cursors = self.con.cursor()
        log.info('execute query all sql: {0}'.format(sql))
        try:
            cursors.execute(sql)
            result = cursors.fetchall()
            log.info('query all result: {0}'.format(result))
            return result
        except Exception as e:
            log.error('query is error: {0}'.format(e))
            return None

    def change_datas(self, sql):
        # 增,删,改
        self._start()
        cursors = self.con.cursor()
        log.info('inert into sql: {0}'.format(sql))
        try:
            result = cursors.execute(sql)
            cursors.close()
            self.con.commit()
            log.info('inert into reault: {0}'.format(result))
            return result
        except Exception as e:
            log.error('inert into error: {0}'.format(e))
            return None

    def batch_insert(self, sql, values):
        try:
            self._start()
            cursors = self.con.cursor()
            result = cursors.executemany(sql, values)
            cursors.close()
            self.con.commit()
            return result
        except Exception as err:
            log.error('import failed with error: {0}'.format(err))
            return None

    def closes(self):
        # 关闭数据库连接
        self._start()
        try:
            self.active = False
            self.con.close()
            log.info('database is closed: {0}'.format(self.host))
        except Exception as e:
            self.active = False
            log.error('database is closed error: {0}'.format(e))
            raise 'server is error {0}'.format(e)

连接日志

#!/usr/bin/env python
# coding:utf-8

import os
import os.path
import socket
import logging
import logging.handlers
import time
from config.env.pathconfig import LOG_PATH

logging.basicConfig()

def singleton(cls, *args, **kw):
    instances = {}

    def _singleton():
        if cls not in instances:
            instances[cls] = cls(*args, **kw)
        return instances[cls]

    return _singleton


@singleton
class CreateLog(object):
    level_relations = {
        "debug": logging.DEBUG,
        "info": logging.INFO,
        "warning": logging.WARNING,
        "error": logging.ERROR,
        "crit": logging.CRITICAL
    }
    logger = logging.getLogger()

    def __init__(self, level='info'):
        host_name = socket.gethostname()
        ip = socket.gethostbyname(host_name)
        logging_msg_format = '[%(asctime)s]--[%(levelname)s]--[' + ip + ']--[%(module)s.py - line:%(lineno)d]-- %(message)s'
        logging_date_format = '%Y-%m-%d %H:%M:%S'
        logging.basicConfig(
            format=logging_msg_format, datefmt=logging_date_format)
        self.logger.setLevel(self.level_relations.get(level))
        if not os.path.exists(LOG_PATH):
            os.mkdir(LOG_PATH)
        log_file = os.path.join(
            LOG_PATH,
            time.strftime('%Y-%m-%d-%H', time.localtime(time.time())) + '.log')
        file_handler = logging.handlers.TimedRotatingFileHandler(
            log_file, 'midnight', 1)
        file_handler.setFormatter(logging.Formatter(logging_msg_format))
        self.logger.addHandler(file_handler)

    def getloger(self):
        return self.logger


log = CreateLog().getloger()

request方法

#!/usr/bin/env python
# coding:utf-8
import requests
import json
import traceback
from config.env.pathconfig import API_YML_PATH
from utils.common import load_yml
from utils.logger import log
from utils.DB import DB

db = DB()
sql_pool = load_yml(API_YML_PATH)


class HttpRequests(object):
    response_object = None

    headers = {
        'Content-Type': "application/json",
        'Authorization': None,
        'x-vipkid-ssoToken': '',
        'env': None,
        'Cookie': None,
        'l5d-ctx-dtab': None
    }

    def joint_url(self, domain, env='stage', route_rule=None):
        if env == 'stage' or env == 'qa3':
            _domain = db.query_one(sql_pool['get_test_domain'].format(
                domain=domain))['test_domain']
            db.closes()
            _domain = _domain.replace("stage", env)
            self.headers['l5d-ctx-dtab'] = route_rule
            if _domain[:4] == 'test':
                self.headers['env'] = env
        elif env == 'pre':
            _domain = db.query_one(
                sql_pool['get_pre_domain'].format(domain=domain))['pre_domain']
            db.closes()
        elif env == 'ip':
            _domain = db.query_one(sql_pool['get_ip_domain'].format(domain=domain))['stage_domain']
            db.closes()
        else:
            _domain = domain
        return 'http://%s' % (_domain)

    def http_requests(self,
                      domain,
                      path,
                      method,
                      payload=None,
                      token=None,
                      sso_token=None,
                      env='stage',
                      route_rule=None):
        """
        http request model
        """
        try:
            self.headers['Authorization'] = token
            self.headers['Cookie'] = 'ssoToken=' + str(sso_token)
            self.headers['x-vipkid-appkey'] = 'sso'

            # aws迁移ali测试用
            domain_list = ['lp-im.vipkid.com.cn', 'internal.clt-management.vipkid.com.cn',
                           'internal.lp-im.vipkid.com.cn', 'internal.lp-im-chat.vipkid.com.cn',
                           'internal.lp-app.vipkid.com.cn']
            if env == 'ip' and domain not in domain_list:
                a = path.split('/')
                path = '/'.join(a[2:])
            if method == 'GET':
                response = requests.request(
                    method=method,
                    url=self.joint_url(domain, env, route_rule) + path,
                    params=self.get_payload(payload),
                    headers=self.headers)
            else:
                response = requests.request(
                    method=method,
                    url=self.joint_url(domain, env, route_rule) + path,
                    json=self.get_payload(payload),
                    headers=self.headers)
            log.info('Headers: {}'.format(self.headers))
            log.info('{0}--{1}--{2}--{3}'.format(response.url, method, payload,
                                                 response.text))
            self.response_object = response
            if 'Content-disposition' in dict(response.headers):  # 下载类接口
                return {"status_code": response.status_code}
            else:
                try:
                    return response.json()
                except Exception as e:
                    log.info('response 转换json失败 {0}'.format(e))
                    return response.text
        except Exception as e:
            log.error('requests error {0} ,excepetion{1}'.format(
                traceback.format_exc(), e))

    def get_payload(self, payload):
        if payload == '':
            return {}
        elif isinstance(payload, (dict, list)):
            return json.loads(json.dumps(payload), encoding='utf-8')
        elif isinstance(payload, str):
            return json.loads(payload, encoding='utf-8')

requester = HttpRequests()
相关标签: 技术类