欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型

程序员文章站 2022-05-18 21:23:35
硬件:XENA Valkyrie 或 Vantage主机,测试板卡不限,本方法适用于其100M~400G所有速率端口 环境配置:Python 3 实现功能: 1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计 2.单个Python脚本配合不同的JSON对象文件,来实现 ......

硬件:xena valkyrie 或 vantage主机,测试板卡不限,本方法适用于其100m~400g所有速率端口

环境配置:python 3

实现功能:

1.控制流量仪进行流量测试,预定配置的流量发送,报文统计,丢包率,延迟等信息的统计

2.单个python脚本配合不同的json对象文件,来实现不同端口数量,不同拓扑模型下的流量测试

3.在发包测试前及运行过程中都对发包端口的链路状态进行监控100m/1g/2.5g/5g/10g

4.嵌入arp探测,获取dut的mac信息,并记录在测试日志中

5.统计端口流量的tx,rx信息,丢包率,延迟信息

设计思路:main文件控制主流程,嵌入statistics模块进行数据统计,createclass模块进行命令预定义

运行效果:

案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型

 

 案例一:利于Python调用JSON对象来实现对XENA流量测试仪的灵活发包测试,能够适应Pair,Rotate,1-to-Many等多种拓扑模型

 

 

主要实现代码:

  1 import threading, time, json, random, queue, types, sys, socket
  2 from binascii import hexlify
  3 from testutilsl23 import xenascripttools
  4 from createclass import streamcreate
  5 from statistics import collecttestresult
  6 
  7 def builddict(ports, configvalue, watching, watching_2):
  8     #generate ports dictionary
  9     a = 1 
 10     for port in ports:
 11         ip_address_b = configvalue['ip'][0:10]
 12         ip_address_c = ip_address_b + str(a)
 13         mac_a = configvalue['mac'][0:10]
 14         if a > 9:
 15             mac_address = mac_a + str(a)
 16         else:
 17             mac_address = mac_a + '0' + str(a)
 18         ip_address_a = hexlify(socket.inet_aton(ip_address_c)).decode()
 19         mac_ip = [str(mac_address), str(ip_address_a)]
 20         watching[port] = mac_ip
 21         a += 1
 22 
 23     watching_2['type_eth'] = 'ffff',
 24     watching_2['type_vlan'] = '8100',
 25     watching_2['type_ip'] = '08004500006a000000007f11ba2d',
 26     watching_2['type_udp'] = '00000000',
 27     watching_2['type_tcp'] = '000000000000000050000000e5a40000',
 28     watching_2['tcpid'] = [hex(int(configvalue['tcpport'][0]))[2:].zfill(4) + hex(int(configvalue['tcpport'][1]))[2:].zfill(4)]
 29 
 30 
 31 def printmes(ports, configvalue, ip_address):
 32     localtime = time.asctime( time.localtime(time.time()) )
 33     print("\n##############################################")
 34     print("                   测试配置                   ")
 35     print("##############################################\n")
 36     print("测试时间     :  " + str(localtime))
 37     print("设备ip地址   :  " + ip_address)
 38     print("测试端口     :  " + str(ports))
 39     print("测试模式     :  " + configvalue['stype'])
 40     print("持续时间     :  " + configvalue['testtime'])
 41     print("报文类型     :  " + configvalue['header_type'])
 42     print("报文长度     :  " + configvalue['packet'])
 43     if configvalue['stype'] == 'aggregation':
 44         print("上行vlan     :  " + str(configvalue['uvlan']))
 45         print("下行vlan     :  " + str(configvalue['uvlan']))
 46         print("wan速率      :  " + str(configvalue['wanrate']))
 47         print("lan速率      :  " + str(configvalue['lanrate']))
 48     else:
 49         print("vlan         :  " + str(configvalue['uvlan']))
 50         print("速率设置     :  " + str(configvalue['wanrate']))
 51     print("端口速率     :  " + str(configvalue['portrate']))
 52     print("学习时间     :  " + configvalue['learntime'])
 53     print("丢包率设置   :  " + configvalue['threshold'])
 54     if configvalue['snlearnenable'] == '1':
 55         print("自动学习sn   :  enable")
 56     else:
 57         print("自动学习sn   :  disable")
 58     if configvalue['portspeedcheck'] == '1':
 59         print("端口速率检测 :  enable" )
 60     else:
 61         print("端口速率检测 :  disable" )
 62 
 63 
 64 def maclearning(xm, ports, configvalue):
 65     if configvalue['snlearnenable'] == '1':
 66         xm.sendexpectok(ports [1] + " ps_create [0]")
 67         xm.sendexpectok(ports [1] + " ps_tpldid [0] 0")
 68         xm.sendexpectok(ports [1] + " ps_packetlength [0] fixed 70 1518")
 69         xm.sendexpectok(ports [1] + " ps_headerprotocol [0] ethernet arp")
 70         learnip_hex = hexlify(socket.inet_aton(configvalue['learnip'])).decode()
 71         xm.sendexpectok(ports [1] + " ps_packetheader [0] 0xffffffffffff00000000000108060001080006040001000000000001c0a80164ffffffffffff" + learnip_hex)
 72         xm.sendexpectok(ports [1] + " ps_ratepps [0] 2")
 73         xm.sendexpectok(ports [1] + " ps_enable [0] on")
 74         xm.sendexpectok(ports [1] + " p_traffic on")
 75         xm.sendexpectok(ports [1] + " p_capture on")
 76         time.sleep(1)
 77         xm.sendexpectok(ports [1] + " p_capture off")
 78         xm.sendexpectok(ports [1] + " p_traffic off")
 79         sn = xm.send(ports[1] + " pc_packet [0] ?")
 80         sn = sn.split('  ')[-1]
 81         xm.sendexpectok(ports [1] + " ps_delete [0]")
 82         serial = str(sn)
 83         if sn == '<badindex>':
 84             print ("\ntest result :failed;[arp learning failed!!]")
 85             sys.exit()
 86         print ("dut sn       :  " + str(serial[16:28]))
 87 
 88 
 89 def portspeed(xm, ports, psc, configvalue):
 90     if configvalue['prcheck'] == '1':
 91         xm.portsynccheck(ports)
 92     i = 0
 93     for port in ports:
 94         ps = xm.send(port + ' p_speed ?')
 95         ps = ps.split('  ')[-1]
 96         psc.append(str(ps))
 97         if configvalue['portspeedcheck'] == '1':
 98             if str(ps) != configvalue['portrate'][i]:
 99                 print ('test result:failed;[' + port + ' 端口速率不匹配,请检查配置...]')
100                 sys.exit()
101         i += 1
102 
103 
104 def runtest(xm, ports, configvalue):
105     xm.porttrafficstart(ports)
106     time.sleep(float(configvalue['learntime']))
107     xm.porttrafficstop(ports)
108     time.sleep(0.5)
109     xm.statisticsclear(ports)
110     time.sleep(0.5)
111     xm.porttimelimit(ports, configvalue['testtime'])
112     time.sleep(0.2)
113     xm.porttrafficstart(ports)
114     count = 0
115     print ("\n开始打流,请耐心等候, 测试时间约为" + configvalue['testtime'] + "秒......\n")
116     while true:
117         print (">>>>>>>>>", end='')
118         xm.send(ports[0] + " p_traffic ?")
119         sys.stdout.flush()
120         count += 1
121         time.sleep(1)
122         if count > int(configvalue['testtime']):
123             print ('\n\n测试结束,正在收集测试数据......')
124             break
125     print()
126     xm.porttrafficstop(ports)
127     time.sleep(1)
128 
129 
130 def main(argv):
131     a = sys.argv
132     configvalue = {}
133     filename = str(a[1])
134     with open(filename,'rb') as f:
135         configs=json.loads(f.read())
136 
137         ip_address = configs.get('ip_address')
138         ports = configs.get('ports', '')
139         configvalue['uvlan'] = configs.get('uvlan', '')
140         configvalue['dvlan'] = configs.get('dvlan', '')
141         configvalue['tcpport'] = configs.get('tcpudpport', '')
142         configvalue['packet'] = configs.get('packet', '')
143         configvalue['testtime'] = configs.get('testtime', '')
144         configvalue['threshold'] = configs.get('threshold', '')
145         configvalue['header_type'] = configs.get('headertype', '')
146         configvalue['learntime'] = configs.get('learntime', '')
147         configvalue['payload'] = configs.get('payload', '')
148         configvalue['wanrate'] = configs.get('wanrate', '')
149         configvalue['lanrate'] = configs.get('lanrate', '')
150         configvalue['portrate'] = configs.get('portrate', '')
151         configvalue['prcheck'] = configs.get('prcheck', '')
152         configvalue['ip'] = configs.get('ip', '')
153         configvalue['mac'] = configs.get('mac', '')
154         configvalue['snlearnenable'] = configs.get('snlernenable', '')
155         configvalue['learnip'] = configs.get('learnip', '')
156         configvalue['stype'] = configs.get('stype', '')
157         configvalue['portspeedcheck'] = configs.get('pscheck', '')
158 
159     psc = []
160     fcs = []
161     watching = {}
162     watching_2 = {}
163     testresult = {}
164 
165     printmes(ports, configvalue, ip_address)
166 
167     xm = xenascripttools(ip_address)
168     xm.logonsetowner("xena", "python_test_1")
169     xm.portrelinquish(ports)
170     xm.portreserve(ports)
171     xm.porttrafficstop(ports)
172     xm.streamsdelete(ports)
173     sc = streamcreate(xm, ports, watching, watching_2, configvalue)
174     cr = collecttestresult(xm, ports, psc, configvalue, testresult, fcs)
175 
176     
177     builddict(ports, configvalue, watching, watching_2)
178     portspeed(xm, ports, psc, configvalue)
179     if configvalue['snlearnenable'] == '1':
180         maclearning(xm, ports, configvalue)
181     if configvalue['stype'] == 'loopback':
182         sc.loopback(xm, ports, watching, watching_2, configvalue)
183         runtest(xm, ports, configvalue)
184         cr.loopbackstatistics(xm, ports, testresult)
185     if configvalue['stype'] == 'eachother':
186         sc.eachother(xm, ports, watching, watching_2, configvalue)
187         runtest(xm, ports, configvalue)
188         cr.eachotherstatistics(xm, ports, testresult)
189     if configvalue['stype'] == 'aggregation':
190         sc.aggregation(xm, ports, watching, watching_2, configvalue)
191         runtest(xm, ports, configvalue)
192         cr.aggregationstatistics(xm, ports, testresult)
193     cr.fcsgetvalue(xm, ports, fcs)
194     cr.outputstatistics(ports, psc, configvalue, testresult, fcs)
195     xm.portrelease(ports)
196 
197 if __name__ == '__main__':
198     sys.exit(main(sys.argv))

 

json对象配置文件

{
"##常用配置修改":"",

"ports": ["0/0","0/1","0/2","0/3","0/4","0/5"],
"packet": "fixed 64 1518",
"testtime": "10",

"##注意:dvlan和lanrate只有在aggregation模式生效,其他模式仅需uvlan和wanrate":"",
"##其中vlan设置为-1则表示不添加vlan":"",
"uvlan": "-1",
"dvlan": "-1",
"wanrate": ["100", "100", "100","100", "100", "100"],
"lanrate": ["21", "22", "23","24"],

"portrate": ["1000","1000","1000","1000","1000","1000"],

"##stype为测试模式,分别可以配置为:loopback, eachother, aggregation":"",
"##其中loopback为环回测试,eachother为两两互打测试,aggregation为汇聚测试":"",
"stype":"eachother",

"##headertype为报文类型,分别可以配置为:tcp, udp, ip, ethernet":"",
"headertype": "ethernet",

"##threshold为丢包率设置":"",
"threshold": "0",




"##不常用配置修改":"",

"##发送学习报文的时间":"",
"learntime": "3",

"##payload类型分别有: pattern, random":"",
"payload": "pattern",

"##prcheck为使能端口状态检测,1为开启,0为关闭;使能后端口为连接的时候会报错":"",
"prcheck": "1",

"##pscheck为检测端口速率是否匹配,1为开启,0为关闭":"",
"pscheck":"1",

"tcpudpport": ["1024", "2048"],
"ip": "192.168.163.1",
"mac": "000000033333",

"##学习dut的mac地址":"",
"snlernenable": "0",
"learnip": "192.168.2.1",

"##测试仪ip地址":"",
"ip_address": "192.168.1.200"
}