欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 JY901串口数据接收与处理(Python)

程序员文章站 2022-06-15 12:30:51
最近在用JY901做一些实验,关于JY901网上有很多资料了,也有上位机软件,可以方便的查看输出数据。我想做的是对输出的角速度进行积分,对比积分后的结果与输出的角度,如果数据都比较准确地话,那么他们应该相差不大。这篇文章里,要完成的事情就是通过串口接收他输出的角速度和角度,然后对角速度进行积分,并实时显示数据结果。下面我首先对各个部分进行分块解释,完整的代码放在最后。1. 串口通信python实现串口通信可以用 pySerial 库。我们首先选择串口对应的设备端口:# 获取串口设备对应的端口def...

最近在用JY901做一些实验,关于JY901网上有很多资料了,也有上位机软件,可以方便的查看输出数据。我想做的是对输出的角速度进行积分,对比积分后的结果与输出的角度,如果数据都比较准确地话,那么他们应该相差不大。

这篇文章里,要完成的事情就是通过串口接收他输出的角速度和角度,然后对角速度进行积分,并实时显示数据结果。下面我首先对各个部分进行分块解释,完整的代码放在最后。

1. 串口通信

python实现串口通信可以用 pySerial 库。我们首先选择串口对应的设备端口:

# 获取串口设备对应的端口
def get_serial_port():
    ports = list(serial.tools.list_ports.comports())
    for port in ports:
        print(ports.index(port), port)
    selected = -1
    while selected < 0 or selected >= len(ports):
        print("please input serial to use [start from 0]:")
        selected = int(input())
        print("selected: ", selected)

    port = ports[selected]
    print("use ", port)
    port = list(port)
    return port[0]

然后打开串口并持续接收数据,在收到的数据中提取IMU数据:

def serial_readers(self, port = None, size = 22):
    # port   : 串口端口
    # size   : 默认一次读取数据长度(bytes)
    if not port:
        port = get_serial_port()
    total = bytearray()
    ser = None
    while True:
        try:
            if not ser:
                ser = serial.Serial(port, 115200, timeout=60)
            if not ser.is_open:
                ser.open()
            tmp = ser.read(size)
            total.extend(tmp)
            total, ext_omega, ext_angle = self.extract_raw_data(total)   #提取IMU数据
        except Exception as e:
            print("exception ", e)
            if ser:
                ser.close()
            time.sleep(5)

    return self.record_omega, self.record_angle

2. IMU数据提取

首先我们来看JY901串口数据的帧结构:

加速度:0x55 0x51 AxL AxH AyL AyH AzL AzH TL TH SUM 
角速度:0x55 0x52 wxL wxH wyL wyH wzL wzH TL TH SUM 
角度:  0x55 0x53 RollL RollH PitchL PitchH YawL YawH TL TH SUM 

这三种数据帧长度都是 11 Bytes,最后一位为校验位,我们从串口数据中提取IMU数据也是根据这个帧结构,先检测帧头,然后读取整个帧(代码里偷了懒,并没有核对校验位)

def extract_raw_data(self, data):
    # 根据报文格式读取角速度数据
    # 0x55 0x52 + data + CRC(8bits)
    length = 9
    ext_omega = None
    ext_angle = None
    try:
        st = data.index(b'\x55')                # 寻找报文引导字
        if st >= 0:
            reserved = data[st+1]               # 数据类型
            if reserved == 82:                  # 0x52,提取角速度消息
                if st+length+1 < len(data):
                    if self.count_omega==0:
                        self.time_now = self.time_last = time.time()
                    else:
                        self.time_last = self.time_now
                        self.time_now = time.time()

                    ext_omega = data[st:st+length+2]
                    temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
                    self.omega = temp/32768*2000
                    self.record_omega.append(self.omega)   #deg/s
                    data = data[st+length+2:]
                    self.count_omega += 1

                    self.omega_inte += self.omega*(self.time_now - self.time_last)
                    self.omega_inte = (self.omega_inte+180)%360 - 180
                    self.record_inte.append(self.omega_inte)

            elif reserved == 83:                # 0x53,提取角度消息
                if st+length+1 < len(data):
                    ext_angle = data[st:st+length+2]
                    temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
                    self.yaw = temp/32768*180
                    if self.count_yaw==0:                   #记录初始相位
                        self.yaw_init = self.yaw
                    self.record_angle.append(self.yaw - self.yaw_init)      #deg
                    data = data[st+length+2:]
                    self.count_yaw += 1

            elif reserved == 81:                # 0x51, 丢弃加速度消息
                if st+length+1 < len(data):
                    ext_data = None
                    data = data[st+length+2:]
            else:
                data = data[st+2:]
    except:
        return data, ext_omega, ext_angle
    if self.count_yaw%100 == 0 and self.count_yaw>0:
        self.save_data()
        print(len(self.record_omega),' extract omega  ',ext_omega,self.omega_inte)
        print(len(self.record_angle),' extract angle  ',ext_angle,self.yaw - self.yaw_init)
        print('*'*30)
    return data, ext_omega, ext_angle

3. 绘图显示

我把历史接收数据都存在一个 list 里面了,每过 0.1s 就重新绘图,看起来就像是在实时更新一样。

def plot(self):
    plt.ion()
    plt.figure()
    while True:
        try:
            if len(self.record_angle)>0 and len(self.record_inte)>0 :
                plt.clf()
                plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
                plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
                plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
                plt.legend(labels=['angle','integration'])
                plt.pause(0.1)
                plt.ioff()
        except Exception as e:
            print("exception ", e)

4. 运行效果

5. 完整代码

为了便于共享数据,定义了一个类 JY901,将上面的各个函数放在这个类里面,输出的数据也都记录在 JY901 的成员变量里。

import serial
import serial.tools.list_ports
import time
import struct
import matplotlib.pyplot as plt
import numpy as np
import threading

class JY901():
    def __init__(self):
        self.time_last = 0
        self.time_now = 0
        self.record_angle = []
        self.record_omega = []
        self.record_inte = []
        self.count_yaw = 0
        self.count_omega = 0
        self.omega_inte = 0         #角速度积分
        self.yaw_init = 0
        self.yaw = 0
        self.omega = 0

    def extract_raw_data(self, data):
        # 根据报文格式读取角速度数据
        # 0x55 0x52 + data + CRC(8bits)
        length = 9
        ext_omega = None
        ext_angle = None
        try:
            st = data.index(b'\x55')                # 寻找报文引导字
            if st >= 0:
                reserved = data[st+1]               # 数据类型
                if reserved == 82:                  # 0x52,提取角速度消息
                    if st+length+1 < len(data):
                        if self.count_omega==0:
                            self.time_now = self.time_last = time.time()
                        else:
                            self.time_last = self.time_now
                            self.time_now = time.time()

                        ext_omega = data[st:st+length+2]
                        temp = int.from_bytes(ext_omega[6:8], byteorder='little', signed=True)
                        self.omega = temp/32768*2000
                        self.record_omega.append(self.omega)   #deg/s
                        data = data[st+length+2:]
                        self.count_omega += 1

                        self.omega_inte += self.omega*(self.time_now - self.time_last)
                        self.omega_inte = (self.omega_inte+180)%360 - 180
                        self.record_inte.append(self.omega_inte)

                elif reserved == 83:                # 0x53,提取角度消息
                    if st+length+1 < len(data):
                        ext_angle = data[st:st+length+2]
                        temp = int.from_bytes(ext_angle[6:8], byteorder='little', signed=True)
                        self.yaw = temp/32768*180
                        if self.count_yaw==0:                   #记录初始相位
                            self.yaw_init = self.yaw
                        self.record_angle.append(self.yaw - self.yaw_init)      #deg
                        data = data[st+length+2:]
                        self.count_yaw += 1

                elif reserved == 81:                # 0x51, 丢弃加速度消息
                    if st+length+1 < len(data):
                        ext_data = None
                        data = data[st+length+2:]
                else:
                    data = data[st+2:]
        except:
            return data, ext_omega, ext_angle
        if self.count_yaw%100 == 0 and self.count_yaw>0:
            self.save_data()
            print(len(self.record_omega),' extract omega  ',ext_omega,self.omega_inte)
            print(len(self.record_angle),' extract angle  ',ext_angle,self.yaw - self.yaw_init)
            print('*'*30)
        return data, ext_omega, ext_angle

    def serial_readers(self, port = None, size = 22):
        # port   : 串口端口
        # size   : 默认一次读取数据长度(bytes)
        if not port:
            port = get_serial_port()
        total = bytearray()
        ser = None
        while True:
            try:
                if not ser:
                    ser = serial.Serial(port, 115200, timeout=60)
                if not ser.is_open:
                    ser.open()
                tmp = ser.read(size)
                total.extend(tmp)
                total, ext_omega, ext_angle = self.extract_raw_data(total)
            except Exception as e:
                print("exception ", e)
                if ser:
                    ser.close()
                time.sleep(5)

        return self.record_omega, self.record_angle

    def save_data(self):
        np.savez('record.npz',record_omega=self.record_omega, record_angle=self.record_angle)

    def plot(self):
        plt.ion()
        plt.figure()
        while True:
            try:
                if len(self.record_angle)>0 and len(self.record_inte)>0 :
                    plt.clf()
                    plt.plot(list(range(len(self.record_angle))),self.record_angle,'r')
                    plt.plot(list(range(len(self.record_inte))),self.record_inte,'b')
                    plt.xlim(max(0,self.count_yaw-3000),self.count_yaw+200)
                    plt.legend(labels=['angle','integration'])
                    plt.pause(0.1)
                    plt.ioff()
            except Exception as e:
                print("exception ", e)


def get_serial_port():
    ports = list(serial.tools.list_ports.comports())
    for port in ports:
        print(ports.index(port), port)
    selected = -1
    while selected < 0 or selected >= len(ports):
        print("please input serial to use [start from 0]:")
        selected = int(input())
        print("selected: ", selected)

    port = ports[selected]
    print("use ", port)
    port = list(port)
    return port[0]


if __name__ == '__main__':
    imu = JY901()
    thread_rcv = threading.Thread(target=imu.serial_readers)
    thread_plt = threading.Thread(target=imu.plot)
    thread_plt.start()
    thread_rcv.start()

本文地址:https://blog.csdn.net/weixin_41024483/article/details/107219643