欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python如何实现Paramiko的二次封装

程序员文章站 2022-11-17 12:54:58
paramiko是一个用于执行ssh命令的python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考。目前本人巡...

paramiko是一个用于执行ssh命令的python第三方库,使用该库可实现自动化运维的所有任务,如下是一些常用代码的封装方式,多数代码为半成品,只是敲代码时的备份副本防止丢失,仅供参考。

目前本人巡检百台设备完全无压力,如果要巡检过千台则需要多线程的支持,过万台则需要加入智能判断等。

实现命令执行: 直接使用过程化封装,执行cmd命令.

import paramiko

ssh = paramiko.sshclient()
ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())

def batchcmd(address,username,password,port,command):
  try:
    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
    stdin , stdout , stderr = ssh.exec_command(command)
    result = stdout.read()
    if len(result) != 0:
      result = str(result).replace("\\n", "\n")
      result = result.replace("b'", "").replace("'", "")
      return result
    else:
      return none
  except exception:
    return none

 实现磁盘巡检: 获取磁盘空间并返回字典格式

def getalldiskspace(address,username,password,port):
  ref_dict = {}
  cmd_dict = {"linux\n" : "df | grep -v 'filesystem' | awk '{print $5 \":\" $6}'",
        "aix\n" : "df | grep -v 'filesystem' | awk '{print $4 \":\" $7}'"
        }
  # 首先检测系统版本
  os_version = batchcmd(address,username,password,port,"uname")
  for version,run_cmd in cmd_dict.items():
    if(version == os_version):
      # 根据不同版本选择不同的命令
      os_ref = batchcmd(address,username,password,port,run_cmd)
      ref_list= os_ref.split("\n")
      # 循环将其转换为字典
      for each in ref_list:
        # 判断最后是否为空,过滤最后一项
        if each != "":
          ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
  return ref_dict

# 磁盘巡检总函数
def diskmain():
  with open("db.json", "r", encoding="utf-8") as read_fp:
    load_json = read_fp.read()
    js = json.loads(load_json)
    base = js.get("base")
    count = len(base)

    for each in range(0,count):
      print("\033[37m-\033[0m" * 80)
      print("\033[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}\033[0m".
        format(base[each][1],base[each][2],base[each][3],base[each][4]))
      print("\033[37m-\033[0m" * 80)

      ref = getalldiskspace(base[each][1],base[each][2],base[each][3],base[each][4])
      for k,v in ref.items():
        # 判断是否存在空盘
        if( v.split("%")[0] != "-"):
          # 将占用百分比转换为整数
          space_ret = int(v.split("%")[0])
          if space_ret >= 70:
            print("\033[31m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k,v))
            continue
          if space_ret >= 50:
            print("\033[33m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v))
            continue
          else:
            print("\033[34m 磁盘分区: {0:30} \t 磁盘占用: {1:5} \033[0m".format(k, v))
            continue
      print()

# 组内传递用户名密码时调用此方法
def groupdiskmain(address,username,password,port):
  ref = getalldiskspace(address,username,password,port)
  for k, v in ref.items():
    if (v.split("%")[0] != "-"):
      space_ret = int(v.split("%")[0])
      if space_ret >= 70:
        print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k, v))
        continue
      if space_ret >= 50:
        print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k, v))
        continue
      else:
        print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [正常]".format(k, v))
        continue
  print()

获取系统内存利用率: 获取系统内存利用率

def getallmemspace(address,username,password,port):
  cmd_dict = {"linux\n" : "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
        "aix\n" : "df | grep -v 'filesystem' | awk '{print $4 \":\" $7}'"
        }
  # 首先检测系统版本
  os_version = batchcmd(address,username,password,port,"uname")
  for version,run_cmd in cmd_dict.items():
    if(version == os_version):
      # 根据不同版本选择不同的命令
      os_ref = batchcmd(address,username,password,port,run_cmd)

      # 首先现将kb转化为mb
      mem_total = math.ceil( int(os_ref.split(":")[0].replace("\n","")) / 1024)
      mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n","")) / 1024)
      mem_used = str( int(mem_total) - int(mem_free))

      # 计算占用空间百分比
      percentage = 100 - int(mem_free / int(mem_total / 100))

      print("内存总计空间: {}".format(str(mem_total) + " mb"))
      print("内存剩余空间: {}".format(str(mem_free) + " mb"))
      print("内存已用空间: {}".format(str(mem_used) + " mb"))
      print("计算百分比: {}".format(str(percentage) + " %"))

获取系统进程信息: 获取系统进程信息,并返回字典格式

def getallprocessspace(address,username,password,port):
  ref_dict = {}
  cmd_dict = {"linux\n" : "ps aux | grep -v 'user' | awk '{print $2 \":\" $11}' | uniq",
        "aix\n" : "ps aux | grep -v 'user' | awk '{print $2 \":\" $12}' | uniq"
        }
  os_version = batchcmd(address,username,password,port,"uname")
  for version,run_cmd in cmd_dict.items():
    if(version == os_version):
      os_ref = batchcmd(address, username, password, port, run_cmd)
      ref_list = os_ref.split("\n")
      for each in ref_list:
        if each != "":
          ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
  return ref_dict

# 巡检进程是否存在
def processmain():
  with open("db.json", "r", encoding="utf-8") as read_fp:
    load_json = read_fp.read()
    js = json.loads(load_json)

    process = js.get("process")
    process_count = len(process)
    for x in range(0,process_count):
      # 根据process中的值查询base中的账号密码
      base = js.get("base")
      if( list(process[x].keys())[0] == base[x][0] ):
        # 拿到账号密码之后再提取出他们的进程id于进程名
        print("\033[37m-\033[0m" * 80)
        print("\033[35m 检测地址: {0:10} \t 用户名: {1:10} \t 密码: {2:10} \t 端口: {3:4}\033[0m".
           format(base[x][1], base[x][2], base[x][3], base[x][4]))
        print("\033[37m-\033[0m" * 80)

        ref_dic = getallprocessspace(base[x][1],base[x][2],base[x][3],base[x][4])
        # ref_val = 全部进程列表 proc_val = 需要检测的进程列表
        ref_val = list(ref_dic.values())
        proc_val = list(process[x].values())[0]
        # 循环比较是否在列表中
        for each in proc_val:
          flag = each in ref_val
          if(flag == true):
            print("\033[34m 进程: {0:50} 状态: √ \033[0m".format(each))
          else:
            print("\033[31m 进程: {0:50} 状态: × \033[0m".format(each))

实现剧本运行功能: 针对特定一台主机运行剧本功能,随便写的一个版本,仅供参考

def runrule(address,username,password,port,playbook):
  os_version = batchcmd(address,username,password,port,"uname")
  if(os_version == list(playbook.keys())[0]):
    play = list(playbook.values())[0]
    print()
    print("\033[37m-\033[0m" * 130)
    print("\033[35m 系统类型: {0:4} \t 地址: {1:10} \t 用户名: {2:10} \t 密码: {3:15} \t 端口: {4:4}\033[0m"
       .format(os_version.replace("\n",""),address,username,password,port))
    print("\033[37m-\033[0m" * 130)

    for each in range(0,len(play)):
      runcmd = play[each] + " > /dev/null 2>&1 && echo $?"
      print("\033[30m [>] 派发命令: {0:100} \t 状态: {1:5} \033[0m".format(
        runcmd.replace(" > /dev/null 2>&1 && echo $?", ""),"正在派发"))

      os_ref = batchcmd(address, username, password, port, runcmd)
      if(os_ref == "0\n"):
        print("\033[34m [√] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
          runcmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发完成"))
      else:
        print("\033[31m [×] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
          runcmd.replace(" > /dev/null 2>&1 && echo $?",""),"派发失败"))
        # 既然失败了,就把剩下的也打出来吧,按照失败处理
        for x in range(each+1,len(play)):
          print("\033[31m [×] 运行命令: {0:100} \t 状态: {1:5} \033[0m".format(
            play[x].replace(" > /dev/null 2>&1 && echo $?", ""), "终止执行"))
        break
  else:
    return 0

# 批量: 传入主机组不同主机执行不同剧本
def runplaybook(hostlist,playbook):
  count = len(hostlist)
  error = []
  success = []
  for each in range(0,count):
    ref = runrule(hostlist[each][0],hostlist[each][1],hostlist[each][2],hostlist[each][3],playbook)
    if ref == 0:
      error.append(hostlist[each][0])
    else:
      success.append(hostlist[each][0])
  print("\n\n")
  print("-" * 130)
  print("执行清单")
  print("-" * 130)
  for each in success:
    print("成功主机: {}".format(each))
  for each in error:
    print("失败主机: {}".format(each))

# 运行测试
def playbookrun():
  playbook = \
    {
      "linux\n":
        [
          "ifconfig",
          "vmstat",
          "ls",
          "netstat -an",
          "ifconfis",
          "cat /etc/passwd | grep 'root' | awk '{print $2}'"
        ]
    }

  addr_list = \
    [
      ["192.168.1.127", "root", "1233", "22"],
      ["192.168.1.126", "root", "1203", "22"]
    ]

  # 指定addr_list这几台机器执行playbook剧本
  runplaybook(addr_list,playbook)

过程化实现文件上传下载: 文件传输功能 put上传 get下载

def batchsftp(address,username,password,port,soruce,target,flag):
  transport = paramiko.transport((address, int(port)))
  transport.connect(username=username, password=password)
  sftp = paramiko.sftpclient.from_transport(transport)
  if flag == "put":
    try:
      ret = sftp.put(soruce, target)
      if ret !="":
        transport.close()
        return 1
      else:
        transport.close()
        return 0
      transport.close()
    except exception:
      transport.close()
      return 0
  elif flag == "get":
    try:
      target = str(address + "_" + target)
      os.chdir("./recv_file")
      ret = sftp.get(soruce, target)
      if ret != "":
        transport.close()
        return 1
      else:
        transport.close()
        return 0
      transport.close()
    except exception:
      transport.close()
      return 0

# 批量将本地文件 source 上传到目标 target 中
def putremotefile(source,target):
  with open("db.json", "r", encoding="utf-8") as fp:
    load_json = fp.read()
    js = json.loads(load_json)
    base = js.get("base")

    print("-" * 130)
    print("接收主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t 本地文件 \t\t 传输到 \t\t\t 传输状态")
    print("-" * 130)

    for each in range(0,len(base)):
      # 先判断主机是否可通信
      ref = batchcmd(base[each][1], base[each][2], base[each][3], base[each][4],"uname")
      if ref == none:
        print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未连通\033[0m".format(
          base[each][1],base[each][2],base[each][3],base[each][4],source,target))
        continue

      ref = batchsftp(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"put")
      if(ref == 1):
        print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功\033[0m".format(
          base[each][1],base[each][2],base[each][3],base[each][4],source,target))
      else:
        print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输失败\033[0m".format(
          base[each][1], base[each][2], base[each][3], base[each][4], source, target))

# 批量将目标文件拉取到本地特定目录(存在缺陷)
def getremotefile(source,target):
  with open("db.json", "r", encoding="utf-8") as fp:
    load_json = fp.read()
    js = json.loads(load_json)
    base = js.get("base")

    print("-" * 130)
    print("发送主机 \t\t 登录用户 \t 登录密码 \t 登录端口 \t\t 远程文件 \t\t 拉取到 \t\t\t 传输状态")
    print("-" * 130)

    for each in range(0,len(base)):
      ref = batchcmd(base[each][1], base[each][2], base[each][3], base[each][4], "uname")
      if ref == none:
        print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 未连通\033[0m".format(
          base[each][1], base[each][2], base[each][3], base[each][4], source, target))
        continue

      ref = batchsftp(base[each][1],base[each][2],base[each][3],base[each][4],source,target,"get")
      if(ref == 1):
        print("\033[34m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输成功\033[0m".format(
          base[each][1],base[each][2],base[each][3],base[each][4],source,target))
      else:
        print("\033[31m{0:15} \t {1:6} \t {2:10} \t {3:3} \t {4:10} \t {5:10} \t 传输失败\033[0m".format(
          base[each][1], base[each][2], base[each][3], base[each][4], source, target))

另一种命令执行方法:

import paramiko

ssh = paramiko.sshclient()
ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())
def batchcmd(address,username,password,port,command):
  try:
    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
    stdin , stdout , stderr = ssh.exec_command(command)
    result = stdout.read()
    if len(result) != 0:
      return result
    else:
      return -1
  except exception:
    return -1

# 通过获取主机ping状态
def getping():
  fp = open("unix_base.db", "r", encoding="utf-8")
  count = len(open("unix_base.db", "r", encoding="utf-8").readlines())
  print("-" * 100)
  print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".format("ip地址","机器系统","设备sn","机房位置","存活状态","主机作用"))
  print("-" * 100)
  for each in range(count):
    ref = eval(fp.readline())
    ret = batchcmd(ref[0],ref[5],ref[6],22,"pwd | echo $?")
    if(int(ret)==0):
      print("{0:20} \t {1:10} \t {2:11} \t {3:5} \t {4:9} \t {5:40}".
         format(ref[0],ref[1],ref[2],ref[3],"正常",ref[4]))
    else:
      print("{0:20} \t {1:10} \t {2:13} \t {3:5} \t {4:9} \t {5:40}".
         format(ref[0],ref[1],ref[2],ref[3],"异常",ref[4]))
  fp.close()

# ps aux | grep "usbcfgdev" | grep -v "grep" | awk {'print $2'}
def getprocessstatus():
  fp = open("unix_process.db", "r", encoding="utf-8")
  count = len(open("unix_process.db", "r", encoding="utf-8").readlines())
  for each in range(count):
    proc = eval(fp.readline())
    proc_len = len(proc)
    print("-" * 70)
    print("---> 巡检地址: {0:10} \t 登录用户: {1:7} \t 登录密码: {2:10}".format(proc[0],proc[1],proc[2]))
    print("-" * 70)
    for process in range(3, proc_len):
      command = "ps aux | grep \'{}\' | grep -v \'grep\' | awk '{}' | head -1".format(proc[process],"{print $2}")
      try:
        ref = batchcmd(proc[0],proc[1],proc[2],22,command)
        if(int(ref)!=-1):
          print("进程: {0:18} \t pid: {1:10} \t 状态: {2}".format(proc[process], int(ref),"√"))
        else:
          print("进程: {0:18} \t pid:{1:10} \t 状态: {2}".format(proc[process], 0,"×"))
      except exception:
        print("进程: {0:18} \t pid:{1:10} \t 状态: {2}".format(proc[process], 0,"×"))
    print()
  fp.close()


def getdiskstatus():
  fp = open("unix_disk.db", "r", encoding="utf-8")
  count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
  for each in range(count):
    proc = eval(fp.readline())
    proc_len = len(proc)
    print("-" * 100)
    print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".
       format(proc[0],proc[1],proc[2],proc[3]))
    print("-" * 100)
    try:
      ref = batchcmd(proc[0], proc[2], proc[3], 22, "df | grep -v 'filesystem'")
      st = str(ref).replace("\\n", "\n")
      print(st.replace("b'", "").replace("'", ""))
    except exception:
      pass
    print()
  fp.close()

# 运行命令
def runcmd(command,system):
  fp = open("unix_disk.db", "r", encoding="utf-8")
  count = len(open("unix_disk.db", "r", encoding="utf-8").readlines())
  for each in range(count):
    proc = eval(fp.readline())
    proc_len = len(proc)

    if proc[1] == system:
      print("-" * 100)
      print("---> 巡检地址: {0:10} \t 登录系统: {1:7} \t 登录账号: {2:10} 登录密码: {3:10}".
         format(proc[0],proc[1],proc[2],proc[3]))
      print("-" * 100)
      try:
        ref = batchcmd(proc[0], proc[2], proc[3], 22, command)
        st = str(ref).replace("\\n", "\n")
        print(st.replace("b'", "").replace("'", ""))
      except exception:
        pass
  fp.close()

面向对象的封装方法: 使用面向对象封装,可极大的提高复用性。

import paramiko

class myssh:
  def __init__(self,address,username,password,default_port = 22):
    self.address = address
    self.default_port = default_port
    self.username = username
    self.password = password

    self.obj = paramiko.sshclient()
    self.obj.set_missing_host_key_policy(paramiko.autoaddpolicy())
    self.obj.connect(self.address,self.default_port,self.username,self.password)
    self.objsftp = self.obj.open_sftp()

  def batchcmd(self,command):
    stdin , stdout , stderr = self.obj.exec_command(command)
    result = stdout.read()
    if len(result) != 0:
      result = str(result).replace("\\n", "\n")
      result = result.replace("b'", "").replace("'", "")
      return result
    else:
      return none

  def getremotefile(self,remotepath,localpath):
    self.objsftp.get(remotepath,localpath)

  def putlocalfile(self,localpath,remotepath):
    self.objsftp.put(localpath,remotepath)


  def getfilesize(self,file_path):
    ref = self.batchcmd("du -s " + file_path + " | awk '{print $1}'")
    return ref

  def closessh(self):
    self.objsftp.close()
    self.obj.close()

if __name__ == '__main__':
  ssh = myssh('192.168.191.3','root','1233',22)

  ref = ssh.batchcmd("ifconfig")
  print(ref)

  sz = ssh.getfilesize("/etc/passwd")
  print(sz)
  ssh.closessh()
第二次封装完善。

import paramiko,os,json,re

class myssh:
  def __init__(self,address,username,password,default_port = 22):
    self.address = address
    self.default_port = default_port
    self.username = username
    self.password = password
    try:
      self.obj = paramiko.sshclient()
      self.obj.set_missing_host_key_policy(paramiko.autoaddpolicy())

      self.obj.connect(self.address,self.default_port,self.username,self.password,timeout=3,allow_agent=false,look_for_keys=false)
      self.objsftp = self.obj.open_sftp()
    except exception:
      pass

  def batchcmd(self,command):
    try:
      stdin , stdout , stderr = self.obj.exec_command(command,timeout=3)
      result = stdout.read()
      if len(result) != 0:
        result = str(result).replace("\\n", "\n")
        result = result.replace("b'", "").replace("'", "")
        return result
      else:
        return none
    except exception:
      return none

  def getremotefile(self,remote_path,local_path):
    try:
      self.objsftp.get(remote_path,local_path)
      return true
    except exception:
      return false

  def putlocalfile(self,localpath,remotepath):
    try:
      self.objsftp.put(localpath,remotepath)
      return true
    except exception:
      return false

  def closessh(self):
    self.objsftp.close()
    self.obj.close()

  # 获取文件大小
  def getfilesize(self,file_path):
    ref = self.batchcmd("du -s " + file_path + " | awk '{print $1}'")
    return ref.replace("\n","")
  # 判断文件是否存在
  def isfile(self,file_path):
    return self.batchcmd("[ -e {} ] && echo 'true' || echo 'false'".format(file_path))

通过eval函数解析执行: 自定义语法规则与函数,通过eval函数实现解析执行. 没写完,仅供参考。

import json,os,sys,math
import argparse,time,re
import paramiko

ssh = paramiko.sshclient()
ssh.set_missing_host_key_policy(paramiko.autoaddpolicy())

def batchcmd(address,username,password,port,command):
  try:
    ssh.connect(hostname=address,username=username,password=password,port=port,timeout=2)
    stdin , stdout , stderr = ssh.exec_command(command)
    result = stdout.read()
    if len(result) != 0:
      result = str(result).replace("\\n", "\n")
      result = result.replace("b'", "").replace("'", "")
      return result
    else:
      return none
  except exception:
    return none


# ------------------------------------------------------------------------
# 内置解析方法

def getdisk(x):
  return str(x)

def getcpuload():
  return str(10)

# 句式解析器,解析句子并执行
def judge(string):
  # 如果匹配到if则执行判断条件解析
  if re.findall(r'if{ (.*?) }', string, re.m) != []:
    # 则继续提取出他的表达式
    ptr = re.compile(r'if[{] (.*?) [}]',re.s)
    subject = re.findall(ptr,string)[0]
    subject_list = subject.split(" ")
    # 拼接语句并执行
    sentence = eval(subject_list[0]) + subject_list[1] + subject_list[2]
    # 组合后执行,返回结果
    if eval(sentence):
      return "if"
    else:
      return false

  # 如果匹配到put则执行上传解析
  elif re.findall(r'put{ (.*?) }', string, re.m) != []:
    print("put")
  return false

# 获取特定目录下所有的剧本
def getallrule():
  rootdir = os.getcwd() + "\\rule\\"
  all_files = [f for f in os.listdir(rootdir)]
  print("-" * 90)
  print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
  print("-" * 90)
  for switch in all_files:
    # 首先判断文件结尾是否为json
    if( switch.endswith(".json") == true):
      all_switch_dir = rootdir + switch
      try:
        # 判断文件内部是否符合json规范
        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
          # 判断是否存在指定字段来识别规范
          load = json.loads(read_file.read())
          if load.get("framework") != none and load.get("task_sequence") != none:
            run_addr_count = len(load.get("address_list"))
            command_count = len(load.get("task_sequence"))
            print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
               format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
      except valueerror:
        pass

# 指定一个剧本并运行
def runplaybook(rule_name):
  rootdir = os.getcwd() + "\\rule\\"
  all_files = [f for f in os.listdir(rootdir)]
  for switch in all_files:
    if( switch.endswith(".json") == true):
      all_switch_dir = rootdir + switch
      # 寻找到需要加载的剧本地址
      if( switch == rule_name):
        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
          data = json.loads(read_file.read())
          address_list = data.get("address_list")
          # 循环每个主机任务
          for each in address_list:
            # 得到剧本内容
            task_sequence = data.get("task_sequence")
            default_port = data.get("default_port")
            print("-" * 90)
            print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
            print("-" * 90)
            for task in task_sequence:
              flag = judge(task[0])
              if flag == "if":
                ref = batchcmd(each[0],each[1],each[2],default_port,task[1])
                print(ref)
              elif flag == false:
                ref = batchcmd(each[0],each[1],each[2],default_port,task[0])
                print(ref)

if __name__ == "__main__":
  runplaybook("get_log.json")

定义剧本规范如下。

{
 "framework": "centos",
 "version": "7.0",
 "address_list":
 [
  ["192.168.191.3","root","1233"]
 ],
 "default_port": "22",
 "task_sequence":
 [
  ["ifconfig"],
  ["if{ getlastcmdflag() == true }","uname"]
 ]
}

词法分析: 词法分析解析剧本内容。

# 获取特定目录下所有的剧本
def getallrule():
  rootdir = os.getcwd() + "\\rule\\"
  all_files = [f for f in os.listdir(rootdir)]
  print("-" * 90)
  print("{0:15} \t {1:10} \t {2:10} \t {3:5} \t {4:5}".format("剧本名称","应用平台","应用端口","执行主机数","命令条数"))
  print("-" * 90)
  for switch in all_files:
    # 首先判断文件结尾是否为json
    if( switch.endswith(".json") == true):
      all_switch_dir = rootdir + switch
      try:
        # 判断文件内部是否符合json规范
        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
          # 判断是否存在指定字段来识别规范
          load = json.loads(read_file.read())
          if load.get("framework") != none and load.get("task_sequence") != none:
            run_addr_count = len(load.get("address_list"))
            command_count = len(load.get("task_sequence"))
            print("{0:15} \t {1:10} {2:10} \t\t {3:5} \t\t {4:5}".
               format(switch,load.get("framework"),load.get("default_port"),run_addr_count,command_count))
      except valueerror:
        pass

# 句式解析器,解析句子并执行
def judge(string):
  # 如果匹配到if则执行判断条件解析
  if re.findall(r'if{ (.*?) }', string, re.m) != []:
    # 则继续提取出他的表达式
    ptr = re.compile(r'if[{] (.*?) [}]',re.s)
    subject = re.findall(ptr,string)[0]
    subject_list = subject.split(" ")

    # 公开接口,执行命令
    ssh = myssh("192.168.191.3","root","1233","22")

    # 组合命令并执行
    sentence = str(eval(subject_list[0]) + subject_list[1] + subject_list[2])
    if eval(sentence):
      return "if",ssh
    else:
      return false

  # 如果匹配到put则执行上传解析
  elif re.findall(r'put{ (.*?) }', string, re.m) != []:
    print("put")
  return false

# 指定一个剧本并运行
def runplaybook(rule_name):
  rootdir = os.getcwd() + "\\rule\\"
  all_files = [f for f in os.listdir(rootdir)]
  for switch in all_files:
    if( switch.endswith(".json") == true):
      all_switch_dir = rootdir + switch
      # 寻找到需要加载的剧本地址
      if( switch == rule_name):
        with open(all_switch_dir , "r" ,encoding="utf-8") as read_file:
          data = json.loads(read_file.read())
          address_list = data.get("address_list")
          # 循环每个主机任务
          for each in address_list:
            # 得到剧本内容
            task_sequence = data.get("task_sequence")
            default_port = data.get("default_port")
            print("-" * 90)
            print("地址: {0:15} 用户名: {1:10} 密码: {2:10}".format(each[0],each[1],each[2]))
            print("-" * 90)
            for task in task_sequence:

              flag,obj = judge(task[0])

              if flag == "if":
                ret = obj.batchcmd(task[1])
                print(ret)
if __name__ == '__main__':
  ret = judge("if{ ssh.getfilesize('/etc/passwd') >= 4 }")
  print(ret)

myssh类最终封装: 通过面向对象对其进行封装,实现了查询cpu,负载,内存利用率,磁盘容量,等通用数据的获取。

import paramiko, math,json

class myssh:
  def __init__(self, address, username, password, default_port):
    self.address = address
    self.default_port = default_port
    self.username = username
    self.password = password
  # 初始化,远程模块
  def init(self):
    try:
      self.ssh_obj = paramiko.sshclient()
      self.ssh_obj.set_missing_host_key_policy(paramiko.autoaddpolicy())
      self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                 allow_agent=false, look_for_keys=false)
      self.sftp_obj = self.ssh_obj.open_sftp()
    except exception:
      return false
  # 执行非交互命令
  def batchcmd(self, command):
    try:
      stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
      result = stdout.read()
      if len(result) != 0:
        result = str(result).replace("\\n", "\n")
        result = result.replace("b'", "").replace("'", "")
        return result
      else:
        return none
    except exception:
      return none
  # 将远程文件下载到本地
  def getremotefile(self, remote_path, local_path):
    try:
      self.sftp_obj.get(remote_path, local_path)
      return true
    except exception:
      return false
  # 将本地文件上传到远程
  def putlocalfile(self, localpath, remotepath):
    try:
      self.sftp_obj.put(localpath, remotepath)
      return true
    except exception:
      return false
  # 关闭接口
  def closessh(self):
    try:
      self.sftp_obj.close()
      self.ssh_obj.close()
    except exception:
      pass
  # 获取文件大小
  def getfilesize(self, file_path):
    ref = self.batchcmd("du -s " + file_path + " | awk '{print $1}'")
    return ref.replace("\n", "")
  # 判断文件是否存在
  def isfile(self, file_path):
    return self.batchcmd("[ -e {} ] && echo 'true' || echo 'false'".format(file_path))
  # 获取系统型号
  def getsystemversion(self):
    return self.batchcmd("uname")
  # 检测目标主机存活状态
  def getping(self):
    try:
      if self.getsystemversion() != none:
        return true
      else:
        return false
    except exception:
      return false
  # 获取文件列表,并得到大小
  def getfilelist(self, path):
    try:
      ref_list = []
      self.sftp_obj.chdir(path)
      file_list = self.sftp_obj.listdir("./")
      for sub_path in file_list:
        dict = {}
        file_size = self.getfilesize(path + sub_path)
        dict[path + sub_path] = file_size
        ref_list.append(dict)
      return ref_list
    except exception:
      return false
  # 将远程文件全部打包后拉取到本地
  def gettarpackageall(self, path):
    try:
      file_list = self.sftp_obj.listdir(path)
      self.sftp_obj.chdir(path)
      for packagename in file_list:
        self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packagename))
        self.sftp_obj.get("/tmp/{}.tar.gz".format(packagename), "./file/{}.tar.gz".format(packagename))
        self.sftp_obj.remove("/tmp/{}.tar.gz".format(packagename))
        return true
    except exception:
      return true
  # 获取磁盘空间并返回字典
  def getalldiskspace(self):
    ref_dict = {}
    cmd_dict = {"linux\n": "df | grep -v 'filesystem' | awk '{print $5 \":\" $6}'",
          "aix\n": "df | grep -v 'filesystem' | awk '{print $4 \":\" $7}'"
          }
    try:
      os_version = self.getsystemversion()
      for version, run_cmd in cmd_dict.items():
        if (version == os_version):
          # 根据不同版本选择不同的命令
          os_ref = self.batchcmd(run_cmd)
          ref_list = os_ref.split("\n")
          # 循环将其转换为字典
          for each in ref_list:
            # 判断最后是否为空,过滤最后一项
            if each != "":
              ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
      return ref_dict
    except exception:
      return false
  # 获取系统内存利用率
  def getallmemspace(self):
    cmd_dict = {"linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
          "aix\n": "svmon -g | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
          }
    try:
      os_version = self.getsystemversion()
      for version, run_cmd in cmd_dict.items():
        if (version == os_version):
          # 根据不同版本选择不同的命令
          os_ref = self.batchcmd(run_cmd)
          # 首先现将kb转化为mb
          mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
          mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)

          # 计算占用空间百分比
          percentage = 100 - int(mem_free / int(mem_total / 100))
          # 拼接字典数据
          return {"total": str(mem_total), "free": str(mem_free), "percentage": str(percentage)}
    except exception:
      return false
  # 获取系统进程信息,并返回字典格式
  def getallprocessspace(self):
    ref_dict = {}
    cmd_dict = {"linux\n": "ps aux | grep -v 'user' | awk '{print $2 \":\" $11}' | uniq",
          "aix\n": "ps aux | grep -v 'user' | awk '{print $2 \":\" $12}' | uniq"
          }
    try:
      os_version = self.getsystemversion()
      for version, run_cmd in cmd_dict.items():
        if (version == os_version):
          os_ref = self.batchcmd(run_cmd)
          ref_list = os_ref.split("\n")
          for each in ref_list:
            if each != "":
              ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
      return ref_dict
    except exception:
      return false
  # 获取cpu利用率
  def getcpupercentage(self):
    ref_dict = {}
    cmd_dict = {"linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
          "aix\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
          }
    try:
      os_version = self.getsystemversion()
      for version, run_cmd in cmd_dict.items():
        if (version == os_version):
          os_ref = self.batchcmd(run_cmd)
          ref_list = os_ref.split("\n")
          for each in ref_list:
            if each != "":
              each = each.split(":")
              ref_dict = {"us": each[0],"sys":each[1],"idea":each[2]}
      return ref_dict
    except exception:
      return false
  # 获取机器的负载情况
  def getloadavg(self):
    ref_dict = {}
    cmd_dict = {"linux\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'",
          "aix\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
          }
    try:
      os_version = self.getsystemversion()
      for version, run_cmd in cmd_dict.items():
        if (version == os_version):
          os_ref = self.batchcmd(run_cmd)
          ref_list = os_ref.split("\n")
          for each in ref_list:
            if each != "":
              each = each.replace(",","").split(":")
              ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
              return ref_dict
      return false
    except exception:
      return false
  # 修改当前用户密码
  def setpasswd(self,username,password):
    try:
      os_id = self.batchcmd("id | awk {'print $1'}")
      print(os_id)
      if(os_id == "uid=0(root)\n"):
        self.batchcmd("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
        return true
    except exception:
      return false

# 定义超类,集成基类myssh
class superssh(myssh):
  def __init__(self,address, username, password, default_port):
    super(superssh, self).__init__(address, username, password, default_port)

我们继续为上面的代码加上命令行,让其可以直接使用,这里需要遵循一定的格式规范,我们使用json解析数据,json格式如下.

{
 "aix":
 [
  ["192.168.1.1","root","123123"],
  ["192.168.1.1","root","2019"],
 ],
 "suse":
 [
  ["192.168.1.1","root","123123"],
 ],
 "centos":
  [
  ["192.168.1.1","root","123123"],
  ]
}

接着是主程序代码,如下所示.

# -*- coding: utf-8 -*-
from myssh import myssh
import json,os,sys,argparse

class initjson():
  def __init__(self,db):
    self.db_name = db
  def getplatform(self,plat):
    with open(self.db_name, "r", encoding="utf-8") as read_pointer:
      load_json = json.loads(read_pointer.read())
      for k,v in load_json.items():
        try:
          if k == plat:
            return v
        except exception:
          return none
    return none

if __name__ == "__main__":
  ptr = initjson("database.json")
  parser = argparse.argumentparser()

  parser.add_argument("-g","--group",dest="group",help="指定主机组")
  parser.add_argument("-c","--cmd",dest="cmd",help="指定cmd命令")
  parser.add_argument("--get",dest="get",help="指定获取数据类型<ping>")
  parser.add_argument("--dst", dest="dst_file",help="目标位置")
  parser.add_argument("--src",dest="src_file",help="原文件路径")
  args = parser.parse_args()

  # 批量cmd --group=aix --cmd=ls
  if args.group and args.cmd:
    platform = ptr.getplatform(args.group)
    success,error = [],[]
    for each in platform:
      ssh = myssh(each[0], each[1], each[2], 22)
      if ssh.init() != false:
        print("-" * 140)
        print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10} \t 命令: {3:30}".
           format(each[0], each[1], each[2], args.cmd))
        print("-" * 140)
        print(ssh.batchcmd(args.cmd))
        ssh.closessh()
        success.append(each[0])
      else:
        error.append(each[0])
        ssh.closessh()
    print("\n\n","-" * 140,
       "\n 执行报告 \n",
       "-" * 140,
       "\n失败主机: {}\n".format(error),
       "-" * 140)

  # 批量获取主机其他数据 --group=centos --get=ping
  if args.group and args.get:
      platform = ptr.getplatform(args.group)
      success, error = [], []
      for each in platform:
        ssh = myssh(each[0], each[1], each[2], 22)
        # 判断是否为ping
        if ssh.init() != false:
          if args.get == "ping":
            ret = ssh.getping()
            if ret == true:
              print("[*] 主机: {} 存活中.".format(each[0]))

          # 收集磁盘数据
          elif args.get == "disk":
            print("-" * 140)
            print("主机: {0:15} \t 账号: {1:10} \t 密码: {2:10}".
               format(each[0], each[1], each[2]))
            print("-" * 140)

            ret = ssh.getalldiskspace()
            for k, v in ret.items():
              if (v.split("%")[0] != "-"):
                space_ret = int(v.split("%")[0])
                if space_ret >= 70:
                  print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警告]".format(k, v))
                  continue
                if space_ret >= 50:
                  print("磁盘分区: {0:30} \t 磁盘占用: {1:5} -> [警惕]".format(k, v))
                  continue
                else:
                  print("磁盘分区: {0:30} \t 磁盘占用: {1:5}".format(k, v))
                  continue
            print()
        else:
          error.append(each[0])
          ssh.closessh()
      print("\n\n", "-" * 140,
         "\n 执行报告 \n",
         "-" * 140,
         "\n失败主机: {}\n".format(error),
         "-" * 140)

  # 实现文件上传过程 --group=centos --src=./a.txt --dst=/tmp/test.txt
  if args.group and args.src_file and args.dst_file:
    platform = ptr.getplatform(args.group)
    success, error = [], []
    for each in platform:
      ssh = myssh(each[0], each[1], each[2], 22)
      if ssh.init() != false:
        ret = ssh.putlocalfile(args.src_file,args.dst_file)
        if ret == true:
          print("主机: {} \t 本地文件: {} \t ---> 传到: {}".format(each[0], args.src_file,args.dst_file))
      else:
        error.append(each[0])
        ssh.closessh()
    print("\n\n", "-" * 140,
       "\n 执行报告 \n",
       "-" * 140,
       "\n失败主机: {}\n".format(error),
       "-" * 140)

简单的使用命令:

远程cmd: python main.py --group=centos --cmd="free -h | grep -v 'total'"

Python如何实现Paramiko的二次封装

判断存活: python main.py --group=centos --get="ping"

Python如何实现Paramiko的二次封装

拉取磁盘:python main.py --group=suse --get="disk"

Python如何实现Paramiko的二次封装

批量上传文件: python main.py --group=suse --src="./aaa" --dst="/tmp/bbb.txt"

Python如何实现Paramiko的二次封装

由于我的设备少,所以没开多线程,担心开多线程对目标造成过大压力,也没啥必要。

番外: 另外我研究了一个主机分组的小工具,加上命令执行代码量才800行,实现了一个分组数据库,在这里记下使用方法。

默认运行进入一个交互式shell环境。

init = 初始化json文件,showhostlist=显示所有主机,showgroup=显示所有组,showallgroup=显示所有主机包括组。

Python如何实现Paramiko的二次封装

添加修改与删除记录,命令如下。

Python如何实现Paramiko的二次封装

添加删除主机组。

Python如何实现Paramiko的二次封装

通过uuid向主机组中添加或删除主机记录。

Python如何实现Paramiko的二次封装

测试主机组连通性。

Python如何实现Paramiko的二次封装

以上就是python如何实现paramiko的二次封装的详细内容,更多关于python实现paramiko的二次封装的资料请关注其它相关文章!