欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

程序中断就要重新开始?手把手教你给你的程序加上进度记录。

程序员文章站 2022-04-01 20:32:47
...

摘要

我们编写的程序,有很多时候运行是需要等待很长时间的,如果出现一些问题,我们可能不希望所有的处理从头再来,编写一个简单的记录程序可以完成这项任务。

介绍

我们编程所完成的任务,有可能只是一些很快就完成的程序,但是在深度学习领域,很多时候需要对很多文件进行处理。这样的时候,如果想中途优化下程序,那么就只能从头再来,或者手动去移动要处理的文件,非常麻烦又浪费时间。我在用moviepy处理视频文件的时候,也会有一些过了很久才会出的bug,因为没有关掉subprocess而导致的,而这种情况显然也没必要从头开始。另外的问题就是你在想修改程序的时候直接中断,也会导致处理了一半的文件仍然存在,比如说我就因为打开了我处理了一半的视频文件而使得windows media player和kmplayer直接崩溃,怎么重装也修复不了。这样的时候,就需要你知道你上次“做完”的进度,这样下次就可以overwrite掉下一个可能已损坏的文件。

代码讲解

我们选用简单的txt文件来记录进度。首先,定义一个txt的位置。

PROGRESSFILE = r"progress.txt"

然后,我们设定存储的形式为一行一种进度记录。记录的方式为section:(记录)#(注释)。然后编写读取这种记录的代码为:

from os.path import exists as os_path_exists
def read_txts(section, file_path=PROGRESSFILE):
    if not os_path_exists(file_path):
        return None
    else:
        with open(file_path, 'r', encoding='gbk') as f:
            for line in f.readlines():  # 逐行读取数据
                line = line.strip()  # 去掉每行头尾空白
                if not len(line) or line.startswith('#'):  # 判断是否是空行或注释行
                    continue
                if line.split(":",1)[0] == section:
                    progress = line.split("#")[0].split(":",1)[1]
                    return progress
            else:
                return None

本函数的思路为,先判断有没有此文件,如果没有则返回空;打开文件并逐行阅读,去掉空行或注释行,剩下的行中,以:和#进行分离,分离出想要的section,如果跟所给的section一致,则返回进度记录progress。接下来写写入文档的函数:

from copy import deepcopy as copy_deepcopy
def write_txts(section, content, annotation=None, file_path=PROGRESSFILE):
    if not os_path_exists(file_path):
        with open(file_path, 'w', encoding='gbk') as f:
            f.writelines('%s:%s#%s\n' % (section, content, annotation))
    else:
        if read_txts(section,file_path) is None:
            with open(file_path, 'a', encoding='gbk') as f:
                f.writelines('%s:%s#%s\n' % (section, content, annotation))
        else:
            with open(file_path, 'r', encoding='gbk') as f:
                lines = f.readlines()
                lines_new = copy_deepcopy(lines)
                for line in lines:
                    if line.split(":")[0] == section:
                        annotation_old = line.split('#')[1].split("\n")[0]
                        lines_new.remove(line)
                if not annotation == None:
                    lines_new.append('%s:%s#%s\n' % (section, content, annotation))
                else:
                    lines_new.append('%s:%s#%s\n' % (section, content, annotation_old))

            with open(file_path, 'w', encoding='gbk') as f:
                for line in lines_new:
                    f.writelines(line)

写记录文档的时候,首先看文档是否存在,如果不存在就需要写入你的section一行即可;如果存在,但是不存在你的section,则需要附加你的section这一句,使用‘a’参数。如果也存在你的section,那就要更新你的section。我的方法是将所有有效的记录读取,去掉你的section的部分,然后加入你给的新的记录,最后重写这个文档。对于注释的处理是如果你写了新的注释那么用你新的注释,如果你用的是None也就是没有注释,那么保留以前的注释。

这个程序要注意的一点就是,你如果自己去写入记录文档,必须保证最后一行是空行,不然就会出错。由于这个程序我又用来存储地址用,所以改成了read_txt和write_txt。那么完整的代码如下:

# -*- coding: utf-8 -*-
from os.path import exists as os_path_exists
from copy import deepcopy as copy_deepcopy
PROGRESSFILE = r"progress.txt"

def read_txts(section, file_path=PROGRESSFILE):
    if not os_path_exists(file_path):
        return None
    else:
        with open(file_path, 'r', encoding='gbk') as f:
            for line in f.readlines():  # 逐行读取数据
                line = line.strip()  # 去掉每行头尾空白
                if not len(line) or line.startswith('#'):  # 判断是否是空行或注释行
                    continue
                if line.split(":",1)[0] == section:
                    progress = line.split("#")[0].split(":",1)[1]
                    return progress
            else:
                return None


def read_progress(section):
    return read_txts(section)


def write_txts(section, content, annotation=None, file_path=PROGRESSFILE):
    if not os_path_exists(file_path):
        with open(file_path, 'w', encoding='gbk') as f:
            f.writelines('%s:%s#%s\n' % (section, content, annotation))
    else:
        if read_txts(section,file_path) is None:
            with open(file_path, 'a', encoding='gbk') as f:
                f.writelines('%s:%s#%s\n' % (section, content, annotation))
        else:
            with open(file_path, 'r', encoding='gbk') as f:
                lines = f.readlines()
                lines_new = copy_deepcopy(lines)
                for line in lines:
                    if line.split(":")[0] == section:
                        annotation_old = line.split('#')[1].split("\n")[0]
                        lines_new.remove(line)
                if not annotation == None:
                    lines_new.append('%s:%s#%s\n' % (section, content, annotation))
                else:
                    lines_new.append('%s:%s#%s\n' % (section, content, annotation_old))

            with open(file_path, 'w', encoding='gbk') as f:
                for line in lines_new:
                    f.writelines(line)


def write_progress(section, progress, annotation=None):
    write_txts(section, progress, annotation)

使用方法就是,每次在做某一件事的时候,先读取progress,然后进入循环,每一次循环重新写progress。这样在直接中断的时候,下一次重新运行程序就可以从上一次最后完成的部分继续进行。比如我们做一个测试样例,循环10**100次去计数:

x=read_progress("counting")
if not x:
    x=0
else:
    x=int(x)
for i in range(10**100):
   x+=1
   write_progress("counting",str(x))

中途停止的话,progress.txt的内容是:

counting:3052#None
 

再一次运行程序的话,将从上一次的3052开始计数。

 结语

如果感兴趣可以订阅我的文章;如果有想讨论的可以在评论区评论,谢谢大家!