欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

python subprocess.Popen系列问题

程序员文章站 2022-03-11 11:51:40
...

最近在项目中遇到一个需求,前端发来一个命令,这个命令是去执行传递过来的一个脚本(shell 或者python),并返回脚本的标准输出和标准出错,如果执行超过设定时间还没结束就超时,然后终止脚本的执行。实现这个功能,自然而然先想到的是subprocess这个库了。

因此,在后端的一个脚本中调用python的subprocess去执行传递过来的脚本,通常情况下subprocess都能运行的很好,完成脚本的执行并返回。最初的实现代码如下:
run_cmd.py

#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os

class test(object):
    def __init__(self):
        self.stdout = []
        self.stderr = []
        self.timeout = 10
        self.is_timeout = False
        pass

    def timeout_callback(self, p):
        print 'exe time out call back'
        print p.pid
        try:
            p.kill()
        except Exception as error:
            print error

    def run(self):
        cmd = ['bash', '/home/XXXX/test.sh']
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        my_timer = Timer(self.timeout, self.timeout_callback, [p])
        my_timer.start()
        try:
            print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
            stdout, stderr = p.communicate()
            exit_code = p.returncode
            print exit_code
            print type(stdout), type(stderr)
            print stdout
            print stderr
        finally:
            my_timer.cancel()

但是偶然间测试一个shell脚本,这个shell脚本中有一行ping www.baidu.com &,shell脚本如下:
test.sh

#!/bin/bash
ping   www.baidu.com (&) #加不加&都没区别
echo $$

python(父进程)用subprocess.Popen新建一个进程(子进程)去开启一个shell, shell新开一个子进程(孙进程)去执行ping www.baidu.com的命令。由于孙进程ping www.baidu.com一直在执行,就类似于一个daemon程序,一直在运行。在超时时间后,父进程杀掉了shell子进程,但是父进程阻塞在了p.communicate函数了,是阻塞在了调用wait()函数之前,感兴趣的朋友可以看一下源码_communicate函数,linux系统重点看_communicate_with_poll和_communicate_with_select函数,你会发现是阻塞在了while循环里面,因为父进程一直在获取输出,而孙进程一直像一个daemon程序一样,一直在往子进程的输出写东西,而子进程的文件句柄继承自父进程。虽然shell子进程被杀掉了,但是父进程里面的逻辑并没有因为子进程被意外的干掉而终止,(因为孙进程一直有输出到子进程的stdout,导致子进程的stdout一直有输出,也就是父进程的stdout也有输出),所以while循环一直成立,就导致了阻塞,进而导致wait()没有被调用,所以子进程没有被回收,就成了僵尸进程。

要完美的解决这个问题就是即要能获取到subprocess.Popen的进程的输出,在超时又要能杀掉子进程,让主进程不被阻塞。

一开始比较急,也对subprocess.Popen没有深入的去用过,尝试了一个low B的办法,就是不用subprocess.Popen.communicate()去获取输出,而是直接去读文件,然后超时后不去读文件。代码如下:
run_cmd.py第一个改版

#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os

class test(object):
    def __init__(self):
        self.stdout = []
        self.stderr = []
        self.timeout = 10
        self.is_timeout = False
        pass

    def timeout_callback(self, p):
        self.is_timeout = True
        print "time out"

    def run(self):
        cmd = ['bash', '/home/zhangxin/work/baofabu/while_test.sh']
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        my_timer = Timer(self.timeout, self.timeout_callback, [p])
        my_timer.start()
        try:
            print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
            for line in iter(p.stdout.readline, b''):
                print line
                if self.is_timeout:
                    break
            for line in iter(p.stderr.readline, b''):
                print line
                if self.is_timeout:
                    break
        finally:
            my_timer.cancel()
            p.stdout.close()
            p.stderr.close()
            p.kill()
            p.wait()

这样虽然能获取输出,在超时后也不再阻塞,写完过后返回来再看时发现,其实在最开始的那一版代码中,只要在超时的回调函数中加上p.stdout.close()和p.stderr.clode(), p.communicate就不再阻塞了,其实问题也就解决了。 但是还会存在一个潜在的问题,父进程结束了,没有其他进程去读取PIPE,daemon孙进程一直往PIPE写,最后导致PIPE填满,孙进程也被阻塞。

所以这样处理其实没任何意义,因为孙进程没有被终止掉,只是简单的关闭了管道。 所以在假期,我仔细的在网上找了找,看了看subprocess,发现subprocess.Popen有一个参数preexec_fn,调用subprocess.Popen时传递preexec_fn=os.setsid或者preexec_fn=os.setpgrp,然后在超时的时候执行os.killpg(p.pid, signal.SIGKILL)就可以杀掉子进程以及在同一个会话的所有进程。所以将run函数的subprocess.Popen改为

p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid)

同时将timeout_callback函数改成如下就可以了:
def timeout_callback(self, p):
    self.is_timeout = True
    print 'exe time out call back'
    print p.pid
    try:
        os.killpg(p.pid, signal.SIGKILL)
    except Exception as error:
        print error

关于preexec_fn=os.setsid的作用,以下摘自https://blog.tonyseek.com/post/kill-the-descendants-of-subprocess/

运行程序 fork 以后,产生的子进程都享有独立的进程空间和 pid,也就是它超出了我们触碰的范围。好在 subprocess.Popen 有个 preexec_fn 参数,它接受一个回调函数,并在 fork 之后 exec 之前的间隙中执行它。我们可以利用这个特性对被运行的子进程做出一些修改,比如执行 setsid() 成立一个独立的进程组。

Linux 的进程组是一个进程的集合,任何进程用系统调用 setsid 可以创建一个新的进程组,并让自己成为首领进程。首领进程的子子孙孙只要没有再调用 setsid 成立自己的独立进程组,那么它都将成为这个进程组的成员。 之后进程组内只要还有一个存活的进程,那么这个进程组就还是存在的,即使首领进程已经死亡也不例外。 而这个存在的意义在于,我们只要知道了首领进程的 pid (同时也是进程组的 pgid), 那么可以给整个进程组发送 signal,组内的所有进程都会收到。

因此利用这个特性,就可以通过 preexec_fn 参数让 Popen 成立自己的进程组, 然后再向进程组发送 SIGTERM 或 SIGKILL,中止 subprocess.Popen 所启动进程的子子孙孙。当然,前提是这些子子孙孙中没有进程再调用 setsid 分裂自立门户。

至于setsid和setpgrp有什么区别,看了各自的man page,还不是很明白,如果有大兄弟知道,并且不吝留言分享告知,感激涕零!

subprocess.Popen只能运行命令或者脚本,而不能像threading的thread库一样运行函数,那么如何在一个只有.py文件的情况下像thread一样运行subprocess.Popen呢? 在调用subprocess.Popen的py我们可以把要执行的脚本内容写到一个临时文件,也即是类似于thread的target函数,然后用subprocess.Popen执行这个临时脚本,这样就可以不用预先存在多个脚本。。如下面的例子:

import os
import signal
import subprocess
import tempfile
import time
import sys


def show_setting_prgrp():
    print('Calling os.setpgrp() from {}'.format(os.getpid()))
    os.setpgrp()
    print('Process group is now {}'.format(
        os.getpid(), os.getpgrp()))
    sys.stdout.flush()

# 这次的重点关注是这里
script = '''#!/bin/sh
echo "Shell script in process $$"
set -x
python3 signal_child.py
'''
script_file = tempfile.NamedTemporaryFile('wt')
script_file.write(script)
script_file.flush()

proc = subprocess.Popen(
    ['sh', script_file.name],
    preexec_fn=show_setting_prgrp,
)
print('PARENT      : Pausing before signaling {}...'.format(
    proc.pid))
sys.stdout.flush()
time.sleep(1)
print('PARENT      : Signaling process group {}'.format(
    proc.pid))
sys.stdout.flush()
os.killpg(proc.pid, signal.SIGUSR1)
time.sleep(3)

当然也可以在shell脚本里面用exec来运行命令,那么就只有父进程和子进程,没有孙进程的概念了。

其实关于阻塞问题,也可以将subprocess.Popen的输出重定向到文件。

#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
from threading import Timer
import os
import time
import signal

class test(object):
    def __init__(self):
        self.stdout = []
        self.stderr = []
        self.timeout = 6
        self.is_timeout = False
        pass

    def timeout_callback(self, p):
        print 'exe time out call back'
        try:
            p.kill()
            # os.killpg(p.pid, signal.SIGKILL)
        except Exception as error:
            print error

    def run(self):
        stdout = open('/tmp/subprocess_stdout', 'wb')
        stderr = open('/tmp/subprocess_stderr', 'wb')
        cmd = ['bash', '/home/xxx/while_test.sh']
        p = subprocess.Popen(cmd, stdout=stdout.fileno(), stderr=stderr.fileno())
        my_timer = Timer(self.timeout, self.timeout_callback, [p])
        my_timer.start()
        print p.pid
        try:
            print "start to count timeout; timeout set to be %d \n" % (self.timeout,)
            p.wait()
        finally:
            my_timer.cancel()
            stdout.flush()
            stderr.flush()
            stdout.close()
            stderr.close()

写在最后,关于p = subprocess.Popen,最好用p.communicate.而不是直接用p.wait(), 因为p.wait()有可能因为子进程往PIPE写的时候写满了,但是子进程还没有结束,导致子进程阻塞,而父进程一直在wait(),导致父进程阻塞。而且p.wait()和p.communicate不能一起用,因为p.communicate里面也会去调用wait()。
在linux平台下,p.wait()其实最后调用的是os.waitpid(), 我们自己用的时候,也尽量用waitpid,而不是wait(),因为多次调用waitpid去wait同一个进程不会导致阻塞,但是程序中多次调用wait就很有可能会被阻塞,详见wait函数的作用。

其实阻塞的根本原因还是因为PIPE满了,所以用PIPE的时候,最好和select或者poll模型一起使用,防止读、写阻塞。 PIPE管道是系统调用,os.pipe产生的一个文件,只不过他有两个fd,一个用于读,一个用于写,当读写端都被关闭后,内核会自动回收。你可以理解内核在内存中开辟了一个队列,一端读,一端写。

管道在进程间通信(IPC)使用很广泛,shell命令就使用的很广泛。比如:
ps –aux | grep mysqld
上述命令表示获取mysqld进程相关的信息。这里ps和grep两个命令通信就采用了管道。管道有几个特点:

  1.  管道是半双工的,数据只能单向流动,ps命令的输出是grep的输出
    
  2.  只能用于父子进程或兄弟进程通信,这里可以认为ps和grep命令都是shell(bash/pdksh/ash/dash)命令的子进程,两者是兄弟关系。
    
  3.  管道相对于管道两端的进程而言就是一个文件,并且只存在于内存中。
    
  4.  写入端不断往管道写,并且每次写到管道末尾;读取端则不断从管道读,每次从头部读取。
    
    到这里大家可能会有一个疑问,管道两端的进程,写入进程不断的写,读取进程不断的读,那么什么时候结束呢?比如我们刚刚这个命令很快就结束了,它的原理是怎么样的呢?对于管道,这里有两个基本原则:
    1.当读一个写端已经关闭的管道时,在所有数据被读取后,read返回0,以指示达到文件结束处。
    2.当写一个读端已经关闭的管道时,会产生sigpipe信息。
    结合这个例子,当ps写管道结束后,就会自动关闭,此时grep进程read就会返回0,然后自动结束。
    具体pipe可以参见http://man7.org/linux/man-pages/man7/pipe.7.html

最近有发现了一个有趣的shell命令timeout,结合python 2.7的subprocess.Popen(python3的subprocess.Popen自带timeout参数),可以做到超时后终止进程。

cmd = ['timeout', 'bash', 'xxxxx']
subprocess.Popen(cmd)

参考来源:
https://pymotw.com/3/subprocess/#process-groups-sessions
http://www.cnblogs.com/cchust/p/3899832.html