python3检测ossfs可用性+钉钉通知
程序员文章站
2022-06-11 16:49:44
#!/usr/bin/env python # -*- coding: utf-8 -*- # @Time : 2019-12-02 15:16 # @Author : Anthony # @Email : ianghont7@163.com # @File : check-ossfs.py # 检 ......
#!/usr/bin/env python # -*- coding: utf-8 -*- # @time : 2019-12-02 15:16 # @author : anthony # @email : ianghont7@163.com # @file : check-ossfs.py # 检测ossfs进程是否存在 # 检测/xxxx/file是否挂载成功 # ossfs可用性检测 import psutil import requests import json import subprocess import threading class dingtalk_base: def __init__(self): self.__headers = {'content-type': 'application/json;charset=utf-8'} self.url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx' def send_msg(self,text): json_text = { "msgtype": "text", "text": { "content": text }, "at": { "atmobiles": [ "" ], "isatall": false } } return requests.post(self.url, json.dumps(json_text), headers=self.__headers).content class dingtalk_disaster(dingtalk_base): def __init__(self): super().__init__() # 填写机器人的url self.url = "https://oapi.dingtalk.com/robot/send?access_token=xxxxx" def check_oss_pid(): pids = psutil.pids() for pid in pids: p = psutil.process(pid) if "ossfs" == p.name().strip(): return true else: return ding.send_msg("""ossfs报警通知: 机器:机器名称 消息内容:ossfs进程不存在,请及时处理...""") def get_ossfs_name(): try: args = "ls -lh /alidata/|grep file|awk '{print $3}'" running_shell = subprocess.popen(args, shell=true, stdin=subprocess.pipe, stdout=subprocess.pipe, stderr=subprocess.pipe,) out,err = running_shell.communicate() for line in out.splitlines(): s1 = bytes.decode(line).strip() if s1.strip() != "root": return ding.send_msg("""ossfs报警通知: 机器:语文课堂 消息内容:/xxxx/file/挂载失败,请及时处理...""") except exception as e: print(e) if __name__ == "__main__": ding = dingtalk_disaster() threads = [threading.thread(target=get_ossfs_name), threading.thread(target=check_oss_pid)] for t in threads: t.start()