欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

QThread 线程终止

程序员文章站 2022-03-10 19:53:44
QThread使用PyQt5开发应用程序时,处理耗时较长的任务时为避免主程序卡死就要用到QThread开子线程,有时子线程是去执行For/While 循环任务,我们如果想临时终止线程,就可以增加一个属性用作判定是否要临时终止的标志位,每轮循环开始前做一次校验判定是否要退出。示例以下示例是做LED循环开关及调光测试的小程序,循环次数通常都设置在数万次,运行后如发现DUT表现异常操作员就要终止测试。代码主程序代码(片段)self.ui.dimmer_cycle_exec.clicked.conn...

QThread

使用PyQt5开发应用程序时,处理耗时较长的任务时为避免主程序卡死就要用到QThread开子线程,有时子线程是去执行For/While 循环任务,我们如果想临时终止线程,就可以增加一个属性用作判定是否要临时终止的标志位,每轮循环开始前做一次校验判定是否要退出。

示例

以下示例是做LED循环开关及调光测试的小程序,循环次数通常都设置在数万次,运行后如发现DUT表现异常操作员就要终止测试。
QThread 线程终止

代码

主程序代码(片段)

	self.ui.dimmer_cycle_exec.clicked.connect(self.dimmer_cycle)
	self.ui.dimmer_cycle_terminate.clicked.connect(self.cycle_stop)

 ##Cycle
    def dimmer_cycle(self):
        self.cycle = cyclingCtrl()
        self.cycle.setTerminationEnabled()
        self.cycle.dimer_targets = self.ui.dimer_targets.text()
        self.cycle.dimmer_hold_time = self.ui.dimmer_hold_time.text()
        self.cycle.dimmer_cycle_times = self.ui.dimmer_cycle_times.text()
        self.cycle.port = self.ui.cycle_ser_port.currentText()
        self.cycle.dimmer_cycle_exec_is_clicked = True
        self.cycle.flare.connect(self.sinShow)
        if self.ser:
            self.cycle.ser = self.ser
        self.cycle.start()

    def cycle_stop(self):
        # 点击”终止“按钮后将子线程 stop_flag属性置1,终止循环
        self.cycle.stop_flag = 1
        self.ui.textBrowser.append("\t循环终止!")

子线程代码

# -*- coding:utf-8 -*-
from PyQt5.QtCore import *
import serial
import time
import datetime
import logging
from math import ceil


class CyclingCtrl(QThread):
    
    """
   Cycling control devices to test the performace.
    """
    
    flare = pyqtSignal(str)

    def __init__(self):
        super(CyclingCtrl, self).__init__()
        self.dimer_targets = None
        self.dimmer_hold_time = None
        self.dimmer_cycle_times = None
        self.dimmer_cycle_exec_is_clicked = False

        self.led_onoff_cycle_targets = None
        self.led_onoff_transtime = None
        self.led_onoff_cycle_times = None
        self.led_onoff_cycle_exec_is_clicked = False

        self.turn_onoff_cycle_targets = None
        self.turn_onoff_hold_time = None
        self.turn_onoff_cycle_times = None
        self.turn_onoff_cycle_exec_is_clicked = False

        self.ser = None
        self.port = None

        self.CycleCtrl_log_file = None
        self.stop_flag = 0  # 线程终止信号,主线程通过修改子线程的该属性控制线程终止

    def run(self):
        if self.ser is None:
            self.ser = serial.Serial(port=self.port, baudrate=115200, timeout=3)
        elif self.ser.is_open:
            self.flare.emit("%s 连接成功" % self.port)
        else:
            self.flare.emit("%s 连接失败" % self.port)

        if self.dimmer_cycle_exec_is_clicked:
            self.flare.emit("开始执行LED dimmer Cycling 程序")
            self.led_dimmer_cycling()
        elif self.led_onoff_cycle_exec_is_clicked:
            self.flare.emit("开始执行LED OnOff Cycling 程序")
            self.led_onoff_cycling()
        elif self.turn_onoff_cycle_exec_is_clicked:
            self.flare.emit("开始执行Turn OnOff Cycling 程序")
            self.turn_onoff_cycling()
        else:
            self.flare.emit("循环控制指令发送失败")

    def led_dimmer_cycling(self):
        logDate = datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss")
        self.CycleCtrl_log_file = "./logs/log%s_dimmerCycling_logs.log" % logDate
        logging.basicConfig(filename=self.CycleCtrl_log_file, level=logging.INFO, format="%(asctime)s :\n%(message)s")
        for cycle in range(0, int(self.dimmer_cycle_times)+1):
            for i in range(0, 21):
                if self.stop_flag:
                    break   # 用户点击 “Stop" 时主线程将self.stop_flag置1,子线程循环终止
                if i < 11:
                    # move level up from 0% to 100%
                    self.ser.write(("\r\n" + "zcl level-control o-mv-to-level {0} 0\r\n".format(
                        ceil(i * 25.5)) + "send {} 1 1\r\n".format(self.dimer_targets)).encode('utf-8'))
                    self.flare.emit("%s\tCycle-%d: o-mv-to-levle=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, i*10))
                    logging.info("%s\tCycle-%d: o-mv-to-levle=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, i*10))
                    time.sleep(int(self.dimmer_hold_time))
                else:
                    # move level down from 90% to 0%
                    self.ser.write(("\r\n" + "zcl level-control o-mv-to-level {0} 0\r\n".format(
                        ceil(510-ceil(i * 25.5))) + "send {} 1 1\r\n".format(self.dimer_targets)).encode('utf-8'))
                    self.flare.emit("%s\tCycle-%d:o-mv-to-level=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, (20-i)*10))
                    logging.info("%s\tCycle-%d:o-mv-to-level=%d percent" % (datetime.datetime.now().strftime("%Y%m%d_%Hh_%Mm_%Ss"), cycle, (20-i)*10))
                    time.sleep(int(self.dimmer_hold_time))

    def cycling_pause(self):
        # TODO:暂停线程
        self.flare.emit("Dimmer循环暂停!")
        pass

    def cycling_continue(self):
        # TODO: 线程继续执行
        pass

TODO

线程挂起、继续还没找到比较靠谱的实现方法。

本文地址:https://blog.csdn.net/willyan2002/article/details/107653906

相关标签: PyQt5