python实现数据库主从状态监控
程序员文章站
2022-03-27 10:05:28
背景想要监控两台主机的主从状态,并且不属于商业业务,不需要上监控系统,那就只需要用个小小脚本监控就好啦。一、思路1.登陆数据库2.获取主从状态3.判断此时状态是否健康4.若健康则退出程序,若不健康则发出报警二、实现mon_MasterSlave_status.py(主监控脚本)'''1.使用前请修改email_dididi配置2.使用前请修改wechat_dididi配置3.使用前请修改sqllist'''import datetimeimport pymysqlfrom...
背景
想要监控两台主机的主从状态,并且不属于商业业务,不需要上监控系统,那就只需要用个小小脚本监控就好啦。
一、思路
1.登陆数据库
2.获取主从状态
3.判断此时状态是否健康
4.若健康则退出程序,若不健康则发出报警
二、实现
mon_MasterSlave_status.py(主监控脚本)
'''
1.使用前请修改email_dididi配置
2.使用前请修改wechat_dididi配置
3.使用前请修改sqllist
'''
import datetime
import pymysql
from master_slave_status import email_dididi ##发送邮箱消息
from master_slave_status import wechat_dididi ##发送企业微信消息
Master_Host = ''
Slave_IO_Running= ''
Slave_SQL_Running= ''
Last_IO_Errno= ''
Last_IO_Error= ''
Last_SQL_Errno= ''
Last_SQL_Error= ''
log= ''
sqllist = [
['ipadd1',3306,'user','passwd1'], #主库
['ipadd2', 3306, 'user', 'passwd2'] #从库
]
for i in sqllist:
try:
sqlcon = pymysql.connect(host=i[0], port=i[1], user=i[2], passwd=i[3])
sqlcur = sqlcon.cursor()
sqlcur.execute("show slave status")
sqlcon.commit()
data = sqlcur.fetchall()
for i in data :
Master_Host=i[1]
Slave_IO_Running=i[10]
Slave_SQL_Running=i[11]
Last_IO_Errno=i[34]
Last_IO_Error=i[35]
Last_SQL_Errno=i[36]
Last_SQL_Error=i[37]
# log = str(i)
nowtime = str(datetime.datetime.now())
if Slave_IO_Running =='Yes' and Slave_SQL_Running == 'Yes':
message = 'MasterSlave status is ok!'
else:
message = f'MasterSlave status is failing!\nMaster_Host:{Master_Host}' \
f'\nSlave_IO_Running:{Slave_IO_Running}' \
f'\nSlave_SQL_Running:{Slave_SQL_Running}\nLast_IO_Errno:{Last_IO_Errno}' \
f'\nLast_IO_Error:{Last_IO_Error}\nLast_SQL_Errno:{Last_SQL_Errno}\nLast_SQL_Error:{Last_SQL_Error}'
# print(message)
email_dididi.send(message) #邮箱提醒(两个报警随意用哪个取消注释&配置正确即可)
#wechat_dididi.send(message) #微信提醒(两个报警随意用哪个取消注释&配置正确即可)
logfile = open('./MasterSlave_fail.log','a')
logfile.write(nowtime+message+\n)
logfile.close()
except:
message = 'Cannot get slave status,please check your mysqlconnect!'
logfile = open('./MasterSlave_fail.log', 'a')
logfile.write(str(datetime.datetime.now()) + message)
logfile.close()
email_dididi.py(邮箱告警脚本)
import smtplib
from email.mime.text import MIMEText
from email.header import Header
def send(mess):
# 第三方 SMTP 服务
mail_host = "" # 设置服务器
mail_user = "" # 用户名
mail_pass = "" # 密码
sender = '111@qq.com' #发送者
receivers = ['123@qq.com','456@qq.com'] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱
message = MIMEText(mess, 'plain', 'utf-8')
message['From'] = Header("数据库主从监控", 'utf-8')
subject = '数据库主从异常通知'
message['Subject'] = Header(subject, 'utf-8')
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25) #25为SMTP端口号
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print("send mail success")
except smtplib.SMTPException:
print("send mail fail")
wechat_dididi.py(微信告警脚本)
import urllib.request
import json
# 获取企业微信token
def get_token(url, corpid, corpsecret):
token_url = '%s/cgi-bin/gettoken?corpid=%s&corpsecret=%s' % (url, corpid, corpsecret)
token = json.loads(urllib.request.urlopen(token_url).read().decode())['access_token']
return token
# 构建告警信息json
def messages(msg):
values = {
"touser": '@all',
"msgtype": 'text',
"agentid": 1000001, # 自建的agentid号
"text": {'content': msg},
"safe": 0
}
msges = (bytes(json.dumps(values), 'utf-8'))
return msges
# 发送告警信息
def send_message(url, token, data):
send_url = '%s/cgi-bin/message/send?access_token=%s' % (url, token)
respone = urllib.request.urlopen(urllib.request.Request(url=send_url, data=data)).read()
x = json.loads(respone.decode())['errcode']
# print(x)
if x == 0:
print('send wechat success')
else:
print('send wechat fail')
def send(message):
corpid = '*******' #企业微信corpid
corpsecret = '******' #企业微信corpsecret
url = 'https://qyapi.weixin.qq.com'
# 函数调用
test_token = get_token(url, corpid, corpsecret)
msg_data = messages(message)
send_message(url, test_token, msg_data)
小提示:
脚本放到从库或者主库用定时任务去执行 python3 /路径/mon_MasterSlave_status.py即可,定时任务时间由则按个人情况去定吧。
三、效果
微信报警:
邮箱报警:
四、结束
baibai lo ~
本文地址:https://blog.csdn.net/weixin_39098318/article/details/111941492