python实时监控logstash日志代码
程序员文章站
2024-02-07 09:43:52
实时读取logstash日志,有异常错误keywork即触发报警。# /usr/bin/env python3# -*- coding: utf-8 -*-# __author__ = caozhi#...
实时读取logstash日志,有异常错误keywork即触发报警。
# /usr/bin/env python3 # -*- coding: utf-8 -*- # __author__ = caozhi # create_time 2018-11-12,update_time 2018-11-15 # version = 1.0 # 录像高可用报警 # 1 读取日志 使用游标移动 # 2 线上业务日志文件会切割,切割后,读取上一个切割的日志 import os import sys import json import requests import time import re cini = conf.ini' log_file = logstash.log' def readconf(): try: with open(cini, 'r+') as f: conf = json.load(f) except: conf = {"seek": 0, "inode": 922817, "last_file": logstash.log"} writeconf(conf=conf) print('conf.ini 配置文件缺失,自动创建一个新的配置文件') return conf def writeconf(conf): with open(cini, 'w+') as e: json.dump(conf, e) def read_log(log_file, seek): try: f = open(log_file, 'r') except filenotfounderror: f = open(logstash.log', 'r') seek = 0 print('上一个文件读取失败了,请检查切割的日志文件') except: print('日志文件打开错误,退出程序') sys.exit() f.seek(seek) line = f.readline() new_seek = f.tell() if new_seek == seek: print('没有追加日志,退出程序') sys.exit() while line: try: logstash = json.loads(line) except: conf = {"seek": 0, "inode": 922817, "last_file": "/data/logs/lmrs/logstash.log"} writeconf(conf=conf) print('json数据加载错误,重新创建一个新的配置文件') sys.exit() #if '''re.search(time.strftime("%y:%h:%m", time.localtime()), logstash.get('log_time')) and '''logstash.get('rtype') == 6 and logstash.get('uri') == '/publish' and logstash.get('event') == 0: if logstash.get('rtype') == 6 and logstash.get('uri') == '/publish' and logstash.get('event') == 0: value = 1 stream = logstash.get('name') print('{} {}'.format(value, stream)) record(value=value, stream=stream) else: value = 0 stream = 0 line = f.readline() seek = f.tell() f.close return value, stream, seek def record(value, stream): data = [] record = {} record['metric'] = 'recording_high_availability_monitor' record['endpoint'] = os.uname()[1] record['timestamp'] = int(time.time()) record['step'] = 60 record['value'] = value record['countertype'] = 'gauge' record['tags'] = '{}={}'.format(int(time.time()), stream) data.append(record) if data: print('这是data的json数据') print(data) falcon_request = requests.post("http://127.0.0.1:1988/v1/push", data=json.dumps(data)) #falcon_request = requests.post("http://127.0.0.1:1988/v1/push", json=data) print('json参数请求返回状态码为:' + str(falcon_request.status_code)) print('json参数请求返回为:' + str(falcon_request.text)) if __name__ == '__main__': print() print('***************************************') print('本次执行脚本时间:{}'.format(time.strftime("%y%m%d_%h%m", time.localtime()))) conf = readconf() print('first_conf :{}'.format(conf)) print('no1.log_file',log_file) last_inode = conf['inode'] inode = os.stat(log_file).st_ino print('last_inode: {} inode: {}'.format(last_inode, inode)) if inode == last_inode: seek = conf['seek'] next_file = 0 else: log_file = conf['last_file'] + time.strftime("-%y%m%d_", time.localtime()) + str(time.strftime("%h%m", time.localtime()))[:-1] + '0' next_file = 1 seek = conf['seek'] print('no2.log_file',log_file) value, stream, seek = read_log(log_file=log_file,seek=seek) if next_file: conf['seek'] = 0 else: conf['seek'] = seek conf['inode'] = os.stat(logstash.log').st_ino writeconf(conf=conf) print('last_conf :{}'.format(conf))
补充知识:logstash 调用exec
我就废话不多说了,还是直接看代码吧!
[elk@vsftp logstash]$ cat t3.conf input { stdin { } } filter { grok { match => [ "message","(?m)\s*%{timestamp_iso8601:time}\s*(?<level>(\s+)).*"] } date { match => ["time", "yyyy-mm-dd hh:mm:ss,sss"] } mutate { add_field =>["type","tailong"] add_field =>["messager","%{type}-%{message}"] remove_field =>["message"] } } output { if ([level] == "error" or [messager] =~ "exception" ) and [messager] !~ "温金服务未连接" and [messager] !~ "调用温金代理系统接口错误" and [messager] !~ "businessexception" { exec { command => "/bin/smail.pl \"%{messager}\" \"%{type}\" " } } stdout { codec =>rubydebug } } vsftp:/root# cat /bin/smail.pl #!/usr/bin/perl use net::smtp; use http::date qw(time2iso str2time time2iso time2isoz); use data::dumper; use getopt::std; use vars qw($opt_d ); getopts('d:'); # mail_user should be your_mail@163.com $message= "@argv"; $env="$opt_d"; sub send_mail{ my $currtime = time2iso(time()); my $to_address = shift; my $mail_user = 'zhao.yangjian@163.com'; my $mail_pwd = 'xx'; my $mail_server = 'smtp.163.com'; my $from = "from: $mail_user\n"; my $subject = "subject: zjcap info\n"; my $info = "$currtime--$message"; my $message = <<content; $info content my $smtp = net::smtp->new($mail_server); $smtp->auth($mail_user, $mail_pwd) || die "auth error! $!"; $smtp->mail($mail_user); $smtp->to($to_address); $smtp->data(); # begin the data $smtp->datasend($from); # set user $smtp->datasend($subject); # set subject $smtp->datasend("\n\n"); $smtp->datasend("$message\n"); # set content $smtp->dataend(); $smtp->quit(); }; send_mail ('zhao.yangjian@163.com'); 2017-01-12 10:19:19,888 jjjjj exception { "@version" => "1", "@timestamp" => "2017-01-12t02:19:19.888z", "host" => "vsftp", "time" => "2017-01-12 10:19:19,888", "level" => "jjjjj", "type" => "tailong", "messager" => "tailong-2017-01-12 10:19:19,888 jjjjj exception" }
以上这篇python实时监控logstash日志代码就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。