荐 JY901串口数据接收与处理(Python)
程序员文章站
2022-06-15 12:30:51
最近在用JY901做一些实验,关于JY901网上有很多资料了,也有上位机软件,可以方便的查看输出数据。我想做的是对输出的角速度进行积分,对比积分后的结果与输出的角度,如果数据都比较准确地话,那么他们应该相差不大。这篇文章里,要完成的事情就是通过串口接收他输出的角速度和角度,然后对角速度进行积分,并实时显示数据结果。下面我首先对各个部分进行分块解释,完整的代码放在最后。1. 串口通信python实现串口通信可以用 pySerial 库。我们首先选择串口对应的设备端口:# 获取串口设备对应的端口def...
最近在用JY901做一些实验,关于JY901网上有很多资料了,也有上位机软件,可以方便的查看输出数据。我想做的是对输出的角速度进行积分,对比积分后的结果与输出的角度,如果数据都比较准确地话,那么他们应该相差不大。
这篇文章里,要完成的事情就是通过串口接收他输出的角速度和角度,然后对角速度进行积分,并实时显示数据结果。下面我首先对各个部分进行分块解释,完整的代码放在最后。
1. 串口通信
python实现串口通信可以用 pySerial
库。我们首先选择串口对应的设备端口:
# 获取串口设备对应的端口
def get_serial_port():
ports = list(serial.tools.list_ports.comports())
for port in ports:
print(ports.index(port), port)
selected = -1
while selected < 0 or selected >= len(ports):
print("please input serial to use [start from 0]:")
selected = int(input())
print("selected: ", selected)
port = ports[selected]
print("use ", port)
port = list(port)
return port[0]
然后打开串口并持续接收数据,在收到的数据中提取IMU数据:
def serial_readers(self, port = None, size = 22):
# port : 串口端口
# size : 默认一次读取数据长度(bytes)
if not port:
port = get_serial_port()
total = bytearray()
ser = None
while True:
try:
if not ser:
ser = serial.Serial(port, 115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ser.read(size)
total.extend(tmp)
total, ext_omega, ext_angle = self.extract_raw_data(total) #提取IMU数据
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)
return self.record_omega, self.record_angle
2. IMU数据提取
首先我们来看JY901串口数据的帧结构:
加速度:0x55 0x51 AxL AxH AyL AyH AzL AzH TL TH SUM
角速度:0x55 0x52 wxL wxH wyL wyH wzL wzH TL TH SUM
角度: 0x55 0x53 RollL RollH PitchL PitchH YawL YawH TL TH SUM
这三种数据帧长度都是 11 Bytes,最后一位为校验位,我们从串口数据中提取IMU数据也是根据这个帧结构,先检测帧头,然后读取整个帧(代码里偷了懒,并没有核对校验位)
def extract_raw_data(self, data):
# 根据报文格式读取角速度数据
# 0x55 0x52 + data + CRC(8bits)
length = 9
ext_omega = None
ext_angle = None
try:
st = data.index(b'\x55') # 寻找报文引导字
if st >= 0:
reserved = data[st+1] # 数据类型
if reserved == 82: # 0x52,提取角速度消息
if st+length+1 < len(data):
if self.count_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()
ext_omega = data[st:st+length+2]
temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
self.omega = temp/32768*2000
self.record_omega.append(self.omega) #deg/s
data = data[st+length+2:]
self.count_omega += 1
self.omega_inte += self.omega*(self.time_now - self.time_last)
self.omega_inte = (self.omega_inte+180)%360 - 180
self.record_inte.append(self.omega_inte)
elif reserved == 83: # 0x53,提取角度消息
if st+length+1 < len(data):
ext_angle = data[st:st+length+2]
temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
if self.count_yaw==0: #记录初始相位
self.yaw_init = self.yaw
self.record_angle.append(self.yaw - self.yaw_init) #deg
data = data[st+length+2:]
self.count_yaw += 1
elif reserved == 81: # 0x51, 丢弃加速度消息
if st+length+1 < len(data):
ext_data = None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
if self.count_yaw%100 == 0 and self.count_yaw>0:
self.save_data()
print(len(self.record_omega),' extract omega ',ext_omega,self.omega_inte)
print(len(self.record_angle),' extract angle ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle
3. 绘图显示
我把历史接收数据都存在一个 list
里面了,每过 0.1s 就重新绘图,看起来就像是在实时更新一样。
def plot(self):
plt.ion()
plt.figure()
while True:
try:
if len(self.record_angle)>0 and len(self.record_inte)>0 :
plt.clf()
plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)
4. 运行效果
5. 完整代码
为了便于共享数据,定义了一个类 JY901
,将上面的各个函数放在这个类里面,输出的数据也都记录在 JY901
的成员变量里。
import serial
import serial.tools.list_ports
import time
import struct
import matplotlib.pyplot as plt
import numpy as np
import threading
class JY901():
def __init__(self):
self.time_last = 0
self.time_now = 0
self.record_angle = []
self.record_omega = []
self.record_inte = []
self.count_yaw = 0
self.count_omega = 0
self.omega_inte = 0 #角速度积分
self.yaw_init = 0
self.yaw = 0
self.omega = 0
def extract_raw_data(self, data):
# 根据报文格式读取角速度数据
# 0x55 0x52 + data + CRC(8bits)
length = 9
ext_omega = None
ext_angle = None
try:
st = data.index(b'\x55') # 寻找报文引导字
if st >= 0:
reserved = data[st+1] # 数据类型
if reserved == 82: # 0x52,提取角速度消息
if st+length+1 < len(data):
if self.count_omega==0:
self.time_now = self.time_last = time.time()
else:
self.time_last = self.time_now
self.time_now = time.time()
ext_omega = data[st:st+length+2]
temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
self.omega = temp/32768*2000
self.record_omega.append(self.omega) #deg/s
data = data[st+length+2:]
self.count_omega += 1
self.omega_inte += self.omega*(self.time_now - self.time_last)
self.omega_inte = (self.omega_inte+180)%360 - 180
self.record_inte.append(self.omega_inte)
elif reserved == 83: # 0x53,提取角度消息
if st+length+1 < len(data):
ext_angle = data[st:st+length+2]
temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
self.yaw = temp/32768*180
if self.count_yaw==0: #记录初始相位
self.yaw_init = self.yaw
self.record_angle.append(self.yaw - self.yaw_init) #deg
data = data[st+length+2:]
self.count_yaw += 1
elif reserved == 81: # 0x51, 丢弃加速度消息
if st+length+1 < len(data):
ext_data = None
data = data[st+length+2:]
else:
data = data[st+2:]
except:
return data, ext_omega, ext_angle
if self.count_yaw%100 == 0 and self.count_yaw>0:
self.save_data()
print(len(self.record_omega),' extract omega ',ext_omega,self.omega_inte)
print(len(self.record_angle),' extract angle ',ext_angle,self.yaw - self.yaw_init)
print('*'*30)
return data, ext_omega, ext_angle
def serial_readers(self, port = None, size = 22):
# port : 串口端口
# size : 默认一次读取数据长度(bytes)
if not port:
port = get_serial_port()
total = bytearray()
ser = None
while True:
try:
if not ser:
ser = serial.Serial(port, 115200, timeout=60)
if not ser.is_open:
ser.open()
tmp = ser.read(size)
total.extend(tmp)
total, ext_omega, ext_angle = self.extract_raw_data(total)
except Exception as e:
print("exception ", e)
if ser:
ser.close()
time.sleep(5)
return self.record_omega, self.record_angle
def save_data(self):
np.savez('record.npz',record_omega=self.record_omega, record_angle=self.record_angle)
def plot(self):
plt.ion()
plt.figure()
while True:
try:
if len(self.record_angle)>0 and len(self.record_inte)>0 :
plt.clf()
plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
plt.legend(labels=['angle','integration'])
plt.pause(0.1)
plt.ioff()
except Exception as e:
print("exception ", e)
def get_serial_port():
ports = list(serial.tools.list_ports.comports())
for port in ports:
print(ports.index(port), port)
selected = -1
while selected < 0 or selected >= len(ports):
print("please input serial to use [start from 0]:")
selected = int(input())
print("selected: ", selected)
port = ports[selected]
print("use ", port)
port = list(port)
return port[0]
if __name__ == '__main__':
imu = JY901()
thread_rcv = threading.Thread(target=imu.serial_readers)
thread_plt = threading.Thread(target=imu.plot)
thread_plt.start()
thread_rcv.start()
本文地址:https://blog.csdn.net/weixin_41024483/article/details/107219643
上一篇: 从零开始的Docker详解(三)