欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python3检测ossfs可用性+钉钉通知

程序员文章站 2023-11-02 15:40:10
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2019-12-02 15:16 # @Author : Anthony # @Email : ianghont7@163.com # @File : check-ossfs.py # 检 ......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @time    : 2019-12-02 15:16
# @author  : anthony
# @email   : ianghont7@163.com
# @file    : check-ossfs.py

# 检测ossfs进程是否存在
# 检测/xxxx/file是否挂载成功
# ossfs可用性检测

import psutil
import requests
import json
import subprocess
import threading

class dingtalk_base:
    def __init__(self):
        self.__headers = {'content-type': 'application/json;charset=utf-8'}
        self.url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
    def send_msg(self,text):
        json_text = {
            "msgtype": "text",
            "text": {
                "content": text
            },
            "at": {
                "atmobiles": [
                    ""
                ],
                "isatall": false
            }
        }
        return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content
class dingtalk_disaster(dingtalk_base):
    def __init__(self):
        super().__init__()
        # 填写机器人的url
        self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx"

def check_oss_pid():
    pids = psutil.pids()
    for pid in pids:
        p = psutil.process(pid)
        if "ossfs" == p.name().strip():
            return true
    else:
        return ding.send_msg("""ossfs报警通知:
机器:机器名称
消息内容:ossfs进程不存在,请及时处理...""")



def get_ossfs_name():
    try:
        args = "ls -lh /alidata/|grep file|awk '{print $3}'"
        running_shell = subprocess.popen(args,
                                         shell=true,
                                         stdin=subprocess.pipe,
                                         stdout=subprocess.pipe,
                                         stderr=subprocess.pipe,)
        out,err = running_shell.communicate()
        for line in out.splitlines():
            s1 = bytes.decode(line).strip()
            if s1.strip() != "root":
                return ding.send_msg("""ossfs报警通知:
机器:语文课堂
消息内容:/xxxx/file/挂载失败,请及时处理...""")
    except exception as e:
        print(e)

if __name__ == "__main__":
    ding = dingtalk_disaster()
    threads = [threading.thread(target=get_ossfs_name),
               threading.thread(target=check_oss_pid)]
    for t in threads:
        t.start()