欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python实现数据库主从状态监控

程序员文章站 2022-07-08 15:56:40
背景想要监控两台主机的主从状态,并且不属于商业业务,不需要上监控系统,那就只需要用个小小脚本监控就好啦。一、思路1.登陆数据库2.获取主从状态3.判断此时状态是否健康4.若健康则退出程序,若不健康则发出报警二、实现mon_MasterSlave_status.py(主监控脚本)'''1.使用前请修改email_dididi配置2.使用前请修改wechat_dididi配置3.使用前请修改sqllist'''import datetimeimport pymysqlfrom...

背景

想要监控两台主机的主从状态,并且不属于商业业务,不需要上监控系统,那就只需要用个小小脚本监控就好啦。

一、思路

1.登陆数据库
2.获取主从状态
3.判断此时状态是否健康
4.若健康则退出程序,若不健康则发出报警

二、实现

mon_MasterSlave_status.py(主监控脚本)

'''
1.使用前请修改email_dididi配置
2.使用前请修改wechat_dididi配置
3.使用前请修改sqllist
'''

import datetime
import pymysql
from master_slave_status import email_dididi   ##发送邮箱消息
from master_slave_status import wechat_dididi  ##发送企业微信消息

Master_Host = ''
Slave_IO_Running= ''
Slave_SQL_Running= ''
Last_IO_Errno= ''
Last_IO_Error= ''
Last_SQL_Errno= ''
Last_SQL_Error= ''
log= ''
sqllist = [
    ['ipadd1',3306,'user','passwd1'],    #主库
    ['ipadd2', 3306, 'user', 'passwd2']  #从库
]
for i in sqllist:
    try:
        sqlcon = pymysql.connect(host=i[0], port=i[1], user=i[2], passwd=i[3])
        sqlcur = sqlcon.cursor()
        sqlcur.execute("show slave status")
        sqlcon.commit()
        data = sqlcur.fetchall()
        for i in data :
            Master_Host=i[1]
            Slave_IO_Running=i[10]
            Slave_SQL_Running=i[11]
            Last_IO_Errno=i[34]
            Last_IO_Error=i[35]
            Last_SQL_Errno=i[36]
            Last_SQL_Error=i[37]
            # log = str(i)
        nowtime = str(datetime.datetime.now())
        if Slave_IO_Running =='Yes' and Slave_SQL_Running == 'Yes':
            message = 'MasterSlave status is ok!'
        else:
            message = f'MasterSlave status is failing!\nMaster_Host:{Master_Host}' \
                      f'\nSlave_IO_Running:{Slave_IO_Running}' \
                      f'\nSlave_SQL_Running:{Slave_SQL_Running}\nLast_IO_Errno:{Last_IO_Errno}' \
                      f'\nLast_IO_Error:{Last_IO_Error}\nLast_SQL_Errno:{Last_SQL_Errno}\nLast_SQL_Error:{Last_SQL_Error}'
            # print(message)
            email_dididi.send(message)        #邮箱提醒(两个报警随意用哪个取消注释&配置正确即可)
            #wechat_dididi.send(message)      #微信提醒(两个报警随意用哪个取消注释&配置正确即可)
            logfile = open('./MasterSlave_fail.log','a')
            logfile.write(nowtime+message+\n)
            logfile.close()

    except:
        message = 'Cannot get slave status,please check your mysqlconnect!'
        logfile = open('./MasterSlave_fail.log', 'a')
        logfile.write(str(datetime.datetime.now()) + message)
        logfile.close()

email_dididi.py(邮箱告警脚本)


import smtplib
from email.mime.text import MIMEText
from email.header import Header
def send(mess):
    # 第三方 SMTP 服务
    mail_host = ""  # 设置服务器
    mail_user = ""  # 用户名
    mail_pass = ""  # 密码

    sender = '111@qq.com' #发送者
    receivers = ['123@qq.com','456@qq.com']  # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
    message = MIMEText(mess, 'plain', 'utf-8')
    message['From'] = Header("数据库主从监控", 'utf-8')
    subject = '数据库主从异常通知'
    message['Subject'] = Header(subject, 'utf-8')

    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)  #25为SMTP端口号
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receivers, message.as_string())
        print("send mail success")
    except smtplib.SMTPException:
        print("send mail fail")

wechat_dididi.py(微信告警脚本)


import urllib.request
import json


# 获取企业微信token
def get_token(url, corpid, corpsecret):
    token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (url, corpid, corpsecret)
    token = json.loads(urllib.request.urlopen(token_url).read().decode())['access_token']
    return token



# 构建告警信息json
def messages(msg):
    values = {
        "touser": '@all',
        "msgtype": 'text',
        "agentid": 1000001,  # 自建的agentid号
        "text": {'content': msg},
        "safe": 0
    }
    msges = (bytes(json.dumps(values), 'utf-8'))
    return msges



# 发送告警信息
def send_message(url, token, data):
    send_url = '%s/cgi-bin/message/send?access_token=%s' % (url, token)
    respone = urllib.request.urlopen(urllib.request.Request(url=send_url, data=data)).read()
    x = json.loads(respone.decode())['errcode']
    # print(x)
    if x == 0:
        print('send wechat success')
    else:
        print('send wechat fail')



def send(message):
    corpid = '*******'  #企业微信corpid
    corpsecret = '******'  #企业微信corpsecret
    url = 'https://qyapi.weixin.qq.com'
    # 函数调用
    test_token = get_token(url, corpid, corpsecret)
    msg_data = messages(message)
    send_message(url, test_token, msg_data)

小提示:
脚本放到从库或者主库用定时任务去执行 python3 /路径/mon_MasterSlave_status.py即可,定时任务时间由则按个人情况去定吧。

三、效果

微信报警:
python实现数据库主从状态监控
邮箱报警:
python实现数据库主从状态监控

四、结束

baibai lo ~

本文地址:https://blog.csdn.net/weixin_39098318/article/details/111941492