欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

开源工具 remoto 遇到的问题追踪与记录

程序员文章站 2022-06-22 07:58:30
记录一次开发错误定位问题,在比较早之前碰到过这个问题,当时选择了回避,使用 paramiko 代替这个 remoto 模块。今天又碰到了这个问题,出于学习目的,打算认真研究这个问题,通过翻看源码,发现是自己的疏忽大意,忽略一个参数。故记录此次学习记录,以此为戒!1. 问题背景在 ceph-deploy 工具中我接触到了一个远程执行命令的好工具:remoto。但是后续在使用该模块进行编码时,遇到了一个问题,下面来仔细描述下这个问题。具体环境:三台服务器:R10-P01-DN-001.gd.cn、R10...

记录一次开发错误定位问题,在比较早之前碰到过这个问题,当时选择了回避,使用 paramiko 代替这个 remoto 模块。今天又碰到了这个问题,出于学习目的,打算认真研究这个问题,通过翻看源码,发现是自己的疏忽大意,忽略一个参数。故记录此次学习记录,以此为戒!

1. 问题背景

在 ceph-deploy 工具中我接触到了一个远程执行命令的好工具:remoto。但是后续在使用该模块进行编码时,遇到了一个问题,下面来仔细描述下这个问题。

具体环境:

三台服务器:R10-P01-DN-001.gd.cn、R10-P01-DN-002.gd.cn、R10-P01-DN-002.gd.cn,其中 01 为主节点,对 01~03的节点都 ssh 免密。目前在 01 节点安装了 python3 而 02~03 只有 python2。

问题重现:

# 在python3中执行,访问001节点,正常

[store@R10-P01-DN-001 redis-agent]$ python3
Python 3.6.10 (default, Jun 19 2020, 10:51:42) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> from remoto.process import check
>>> conn = remoto.Connection('R10-P01-DN-001.gd.cn')
>>> check(conn, ['hostname'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname
(['R10-P01-DN-001.gd.cn'], [], 0)

# 在 python3中执行,访问002或者003,异常
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
bash: python3: command not found
ERROR:R10-P01-DN-001.gd.cn:Can't communicate with remote host, possibly because python3 is not installed there
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 997, in _send
    message.to_io(self._io)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 443, in to_io
    io.write(header + self.data)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 410, in write
    self.outfile.flush()
BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/python3.6/lib/python3.6/site-packages/remoto/backends/__init__.py", line 35, in __init__
    self.gateway = self._make_gateway(hostname)
  File "/opt/python3.6/lib/python3.6/site-packages/remoto/backends/__init__.py", line 48, in _make_gateway
    gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway.py", line 72, in reconfigure
    self._send(Message.RECONFIGURE, data=data)
  File "/opt/python3.6/lib/python3.6/site-packages/execnet/gateway_base.py", line 1003, in _send
    raise IOError("cannot send (already closed?)")
OSError: cannot send (already closed?)
# python2中正常,因为对端主机也有python2

[store@R10-P01-DN-001 redis-agent]$ python
Python 2.7.5 (default, Apr  2 2020, 01:29:16)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> from remoto.process import check
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
>>> check(conn, ['hostname', '-s'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname -s
([u'R10-P01-DN-002'], [], 0)

这个问题看着会比较明显,就是如果使用的 python3,就会去对端找 python3,找不到就会抛错。我们现在深入分析下源码内容,找到相应的逻辑判断语句进行修改。

2. 源码深入分析

remoto 工具是封装的 execnet 模块,想要彻底搞懂 remoto 模块代码,我们就必须先掌握好 execnet 模块的使用。我们从这个地址开始学习 execnet 模块,以下都是来自官方的例子:

# Execute source code in subprocess, communicate through a channel¶
>>> import execnet
>>> gw = execnet.makegateway()
>>> channel = gw.remote_exec("channel.send(channel.receive()+1)")
>>> channel.send(1)
>>> channel.receive()
2

下面这个例子是整个 remoto 的核心,它在远程执行一个函数,实现交互:

# 远程执行一个函数

import execnet

def multiplier(channel, factor):
    while not channel.isclosed():
        # 收到channel消息
        param = channel.receive()
        # 通过channel将结果返回
        channel.send(param * factor)

# 建立通道
gw = execnet.makegateway()
#远程执行函数multiplier(),后面的是传递给它的参数;返回channel,从中可以拿到返回的结果。
channel = gw.remote_exec(multiplier, factor=10)

for i in range(5):
    # 给channel发送数据
    channel.send(i)
    # 从channel拿到结果
    result = channel.receive()
    assert result == i * 10
    
# 关闭通道
gw.exit()

开源工具 remoto 遇到的问题追踪与记录
翻看 remoto 源码,可知 remoto 源码最核心的两个代码文件分别为:

  • backends/__init__.py:定义了 BaseConnection 类;
  • process.py:定义了最关键的 run()check() 方法;

先来看连接类,其实就是封装的 execnet 模块:

# 忽略导入模块
# ...

class BaseConnection(object):
    """
    Base class for Connection objects. Provides a generic interface to execnet
    for setting up the connection
    """
    executable = ''
    remote_import_system = 'legacy'

    def __init__(self, hostname, logger=None, sudo=False, threads=1, eager=True,
                 detect_sudo=False, interpreter=None, ssh_options=None):
        self.sudo = sudo
        self.hostname = hostname
        self.ssh_options = ssh_options
        self.logger = logger or basic_remote_logger()
        self.remote_module = None
        self.channel = None
        self.global_timeout = None  # wait for ever

        self.interpreter = interpreter or 'python%s' % sys.version_info[0]

        if eager:
            try:
                if detect_sudo:
                    self.sudo = self._detect_sudo()
                # 这里执行
                self.gateway = self._make_gateway(hostname)
            except OSError:
                self.logger.error(
                    "Can't communicate with remote host, possibly because "
                    "%s is not installed there" % self.interpreter
                )
                raise

    def _make_gateway(self, hostname):
        gateway = execnet.makegateway(
            self._make_connection_string(hostname)
        )
        # 这里报错
        gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
        return gateway

    def _detect_sudo(self, _execnet=None):
        """
        ``sudo`` detection has to create a different connection to the remote
        host so that we can reliably ensure that ``getuser()`` will return the
        right information.

        After getting the user info it closes the connection and returns
        a boolean
        """
        exc = _execnet or execnet
        gw = exc.makegateway(
            self._make_connection_string(self.hostname, use_sudo=False)
        )

        channel = gw.remote_exec(
            'import getpass; channel.send(getpass.getuser())'
        )

        result = channel.receive()
        gw.exit()

        if result == 'root':
            return False
        self.logger.debug('connection detected need for sudo')
        return True

    ##############################################################################
    def _make_connection_string(self, hostname, _needs_ssh=None, use_sudo=None):
        _needs_ssh = _needs_ssh or needs_ssh
        interpreter = self.interpreter
        if use_sudo is not None:
            if use_sudo:
                interpreter = 'sudo ' + interpreter
        elif self.sudo:
            interpreter = 'sudo ' + interpreter
        if _needs_ssh(hostname):
            if self.ssh_options:
                return 'ssh=%s %s//python=%s' % (
                    self.ssh_options, hostname, interpreter
                )
            else:
                return 'ssh=%s//python=%s' % (hostname, interpreter)
        return 'popen//python=%s' % interpreter
    ##############################################################################

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.exit()
        return False

    def cmd(self, cmd):
        """
        In the base connection class, this method just returns the ``cmd``
        as-is. Other implementations will end up doing transformations to the
        command by prefixing it with other flags needed. See
        :class:`KubernetesConnection` for an example
        """
        return cmd

    ############################################################################
    def execute(self, function, **kw):
        return self.gateway.remote_exec(function, **kw)
    ###########################################################################

    def exit(self):
        self.gateway.exit()

    def import_module(self, module):
        """
        Allows remote execution of a local module. Depending on the
        ``remote_import_system`` attribute it may use execnet's implementation
        or remoto's own based on JSON.

        .. note:: It is not possible to use execnet's remote execution model on
                  connections that aren't SSH or Local.
        """
        if self.remote_import_system is not None:
            if self.remote_import_system == 'json':
                self.remote_module = JsonModuleExecute(self, module, self.logger)
            else:
                self.remote_module = LegacyModuleExecute(self.gateway, module, self.logger)
        else:
            self.remote_module = LegacyModuleExecute(self.gateway, module, self.logger)
        return self.remote_module
    
# ...

前面问题涉及到的重点函数就是 _make_connection_string()execute(),后面结合 process.py 代码进行分析。

# 源码位置: remoto/process.py

# ...


def _remote_check(channel, cmd, **kw):
    import subprocess
    stdin = kw.pop('stdin', None)
    process = subprocess.Popen(
        cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE, **kw
    )

    if stdin:
        if not isinstance(stdin, bytes):
            stdin.encode('utf-8', errors='ignore')
        stdout_stream, stderr_stream = process.communicate(stdin)
    else:
        stdout_stream = process.stdout.read()
        stderr_stream = process.stderr.read()

    try:
        stdout_stream = stdout_stream.decode('utf-8')
        stderr_stream = stderr_stream.decode('utf-8')
    except AttributeError:
        pass

    stdout = stdout_stream.splitlines()
    stderr = stderr_stream.splitlines()
    channel.send((stdout, stderr, process.wait()))


def check(conn, command, exit=False, timeout=None, **kw):
    """
    Execute a remote command with ``subprocess.Popen`` but report back the
    results in a tuple with three items: stdout, stderr, and exit status.

    This helper function *does not* provide any logging as it is the caller's
    responsibility to do so.
    """
    command = conn.cmd(command)

    stop_on_error = kw.pop('stop_on_error', True)
    timeout = timeout or conn.global_timeout
    if not kw.get('env'):
        # get the remote environment's env so we can explicitly add
        # the path without wiping out everything
        kw = extend_env(conn, kw)

    conn.logger.info('Running command: %s' % ' '.join(admin_command(conn.sudo, command)))
    result = conn.execute(_remote_check, cmd=command, **kw)
    response = None
    try:
        response = result.receive(timeout)
    except Exception as err:
        # the things we need to do here :(
        # because execnet magic, we cannot catch this as
        # `except TimeoutError`
        if err.__class__.__name__ == 'TimeoutError':
            msg = 'No data was received after %s seconds, disconnecting...' % timeout
            conn.logger.warning(msg)
            # there is no stdout, stderr, or exit code but make the exit code
            # an error condition (non-zero) regardless
            return [], [], -1
        else:
            remote_trace = traceback.format_exc()
            remote_error = RemoteError(remote_trace)
            if remote_error.exception_name == 'RuntimeError':
                conn.logger.error(remote_error.exception_line)
            else:
                for tb_line in remote_trace.split('\n'):
                    conn.logger.error(tb_line)
            if stop_on_error:
                raise RuntimeError(
                    'Failed to execute command: %s' % ' '.join(command)
                )
    if exit:
        conn.exit()
    return response

可以看到 check() 方法最核心的语句如下:

conn.execute(_remote_check, cmd=command, **kw)

它就是使用 execnet 模块中的远程调用函数方法,远程调用 _remote_check(),这个函数是用于执行 shell 命令并得到相应的返回结果。这里会选择相应的 python 解释器来执行该函数。

回过头来思考下,我们是在做连接的时候就报错了:

>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn')
# 省略异常信息

再看看 BaseConnection 类的初始化过程:

# ...

class BaseConnection(object):
    # ...
    
    def __init__(self, hostname, logger=None, sudo=False, threads=1, eager=True,
                 detect_sudo=False, interpreter=None, ssh_options=None):
        self.sudo = sudo
        self.hostname = hostname
        self.ssh_options = ssh_options
        self.logger = logger or basic_remote_logger()
        self.remote_module = None
        self.channel = None
        self.global_timeout = None  # wait for ever

        self.interpreter = interpreter or 'python%s' % sys.version_info[0]

        if eager:
            try:
                if detect_sudo:
                    self.sudo = self._detect_sudo()
                # 这里执行
                self.gateway = self._make_gateway(hostname)
            except OSError:
                self.logger.error(
                    "Can't communicate with remote host, possibly because "
                    "%s is not installed there" % self.interpreter
                )
                raise

    def _make_gateway(self, hostname):
        gateway = execnet.makegateway(
            self._make_connection_string(hostname)
        )
        # 这里报错
        gateway.reconfigure(py2str_as_py3str=False, py3str_as_py2str=False)
        return gateway
    
    # ...
    
    def _make_connection_string(self, hostname, _needs_ssh=None, use_sudo=None):
        _needs_ssh = _needs_ssh or needs_ssh
        interpreter = self.interpreter
        if use_sudo is not None:
            if use_sudo:
                interpreter = 'sudo ' + interpreter
        elif self.sudo:
            interpreter = 'sudo ' + interpreter
        if _needs_ssh(hostname):
            if self.ssh_options:
                return 'ssh=%s %s//python=%s' % (
                    self.ssh_options, hostname, interpreter
                )
            else:
                return 'ssh=%s//python=%s' % (hostname, interpreter)
        return 'popen//python=%s' % interpreter

可以看到,在 Connection 类初始过程中会对 execnet 模块设置 python 解释器,如果没有设置 self.interpreter,其赋值逻辑如下:

self.interpreter = interpreter or 'python%s' % sys.version_info[0]

这正是我们前面看到的现象,默认使用当前系统的 python 版本。于是我们只需要在连接远端主机时,设置 python 为 python2 版本即可解决这个问题:

3. 问题定位与解决

[store@R10-P01-DN-001 redis-agent]$ python3
Python 3.6.10 (default, Jun 19 2020, 10:51:42) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import remoto
>>> conn = remoto.Connection('R10-P01-DN-002.gd.cn', interpreter='/bin/python')
>>> 
>>> from remoto.process import check
>>> check(conn, ['hostname'])
INFO:R10-P01-DN-001.gd.cn:Running command: hostname
(['R10-P01-DN-002.gd.cn'], [], 0)
>>> 

可以看到,最后不会出现前面那个报错。但这样指定 python 解释器话,远端执行的代码就必须时能用 python2 跑起来的,这点要注意!

4. 小结

花了两个小时看源码还是非常有收获的,理解了 remoto 模块两个方法背后的原理,顺带找到了之前调用出错的原因,终于又可以使用 remoto 模块愉快编程了!

本文地址:https://blog.csdn.net/qq_40085317/article/details/107666454