python利用微信公众号实现报警功能
微信公众号共有三种,服务号、订阅号、企业号。它们在获取accesstoken上各有不同。
其中订阅号比较坑,它的accesstoken是需定时刷新,重复获取将导致上次获取的accesstoken失效。
而企业号就比较好,accesstoken有效期同样为7200秒,但有效期内重复获取返回相同结果。
为兼容这两种方式,因此按照订阅号的方式处理。
处理办法与接口文档中的要求相同:
为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。
而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。
下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数
据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。
设计思路和使用方法:
自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据
尽可能的保证class中每一个可执行的函数单独调用都能成功。
class中只将真正能被用到的方法和变量设置为public的。
使用时只需要修改此文件中的weixin_qy_corpid和weixin_qy_secret改成自己的,并import此文件,使
用weixintokenclass().get()方法即可得到access_token。
#!/usr/bin/python # encoding: utf-8 # -*- coding: utf8 -*- #python学习群125240963每天更新资料,包括2018最新企业级项目案例,同千人一起交流。 import os import sqlite3 import sys import urllib import urllib2 import json import datetime # import time enable_debug = true def debug(msg, code=none): if enable_debug: if code is none: print "message: %s" % msg else: print "message: %s, code: %s " % (msg, code) author_mail = "uberurey_ups@163.com" weixin_qy_corpid = "your_corpid" weixin_qy_secret = "your_secret" # build paths inside the project like this: os.path.join(base_dir, ...) base_dir = os.path.dirname(os.path.abspath(__file__)) # database # https://docs.djangoproject.com/en/1.9/ref/settings/#databases databases = { 'default': { 'engine': 'db.backends.sqlite3', 'name': os.path.join(base_dir, '.odbp_db.sqlite3'), } } sqlite3_db_file = str(databases['default']['name']) def sqlite3_conn(database): try: conn = sqlite3.connect(database) except sqlite3.error: print >> sys.stderr, """\ there was a problem connecting to database: %s the error leading to this problem was: %s it's possible that this database is broken or permission denied. if you cannot solve this problem yourself, please mail to: %s """ % (database, sys.exc_value, author_mail) sys.exit(1) else: return conn def sqlite3_commit(conn): return conn.commit() def sqlite3_close(conn): return conn.close() def sqlite3_execute(database, sql): try: sql_conn = sqlite3_conn(database) sql_cursor = sql_conn.cursor() sql_cursor.execute(sql) sql_conn.commit() sql_conn.close() except sqlite3.error as e: print e sys.exit(1) def sqlite3_create_table_token(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''create table "main"."weixin_token" ( "id" integer , "access_token" text, "expires_in" text, "expires_on" text, "is_expired" integer ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_table_account(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''create table "main"."weixin_account" ( "id" integer, "name" text, "corpid" text, "secret" text, "current" integer ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_create_tables(): print "sqlite3_create_tables" sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''create table "main"."weixin_token" ( "id" integer , "access_token" text, "expires_in" text, "expires_on" text, "is_expired" integer ) ; ''') sql_cursor.execute('''create table "main"."weixin_account" ( "id" integer, "name" text, "corpid" text, "secret" text, "current" integer ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn) def sqlite3_set_credential(corpid, secret): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''insert into "weixin_account" ("id", "name", "corpid", "secret", "current") values (1, 'odbp', ?, ?, 1) ''', (corpid, secret)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.error: sqlite3_create_table_account() sqlite3_set_credential(corpid, secret) def sqlite3_set_token(access_token, expires_in, expires_on, is_expired): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''insert into "weixin_token" ("id", "access_token", "expires_in", "expires_on", "is_expired") values ( 1, ?, ?, ?, ? ) ''', (access_token, expires_in, expires_on, is_expired)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.error: sqlite3_create_table_token() sqlite3_set_token(access_token, expires_in, expires_on, is_expired) def sqlite3_get_credential(): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute('''select "corpid", "secret" from weixin_account where current == 1;''') result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.error: sqlite3_set_credential(weixin_qy_corpid, weixin_qy_secret) return sqlite3_get_credential() else: if result is not none and len(result) != 0: return result else: print "unrecoverable problem, please alter to %s" % author_mail sys.exit(1) def sqlite3_get_token(): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''select "access_token", "expires_on" from weixin_token where "is_expired" == 1 ;''') result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.error: info = sys.exc_info() print info[0], ":", info[1] else: if result is not none and len(result) != 0: return result else: # print "unrecoverable problem, please alter to %s" % author_mail # sys.exit(1) return none def sqlite3_update_token(access_token, expires_on): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''update "weixin_token" set access_token=?, expires_on=? where _rowid_ = 1;''', (access_token, expires_on) ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) class weixintokenclass(object): def __init__(self): self.__corpid = none self.__corpsecret = none self.__use_persistence = true self.__access_token = none self.__expires_in = none self.__expires_on = none self.__is_expired = none if self.__use_persistence: self.__corpid = sqlite3_get_credential()[0][0] self.__corpsecret = sqlite3_get_credential()[0][1] else: self.__corpid = weixin_qy_corpid self.__corpsecret = weixin_qy_secret def __get_token_from_weixin_qy_api(self): parameters = { "corpid": self.__corpid, "corpsecret": self.__corpsecret } url_parameters = urllib.urlencode(parameters) token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?" url = token_url + url_parameters response = urllib2.urlopen(url) result = response.read() token_json = json.loads(result) if token_json['access_token'] is not none: get_time_now = datetime.datetime.now() # todo(guodong ding) token will expired ahead of time or not expired after the time expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in']) token_json['expires_on'] = str(expire_time) self.__access_token = token_json['access_token'] self.__expires_in = token_json['expires_in'] self.__expires_on = token_json['expires_on'] self.__is_expired = 1 try: token_result_set = sqlite3_get_token() except sqlite3.error: token_result_set = none if token_result_set is none and len(token_result_set) == 0: sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired) else: if self.__is_token_expired() is true: sqlite3_update_token(self.__access_token, self.__expires_on) else: debug("pass") return else: if token_json['errcode'] is not none: print "errcode is: %s" % token_json['errcode'] print "errmsg is: %s" % token_json['errmsg'] else: print result def __get_token_from_persistence_storage(self): try: token_result_set = sqlite3_get_token() except sqlite3.error: self.__get_token_from_weixin_qy_api() finally: if token_result_set is none: self.__get_token_from_weixin_qy_api() token_result_set = sqlite3_get_token() access_token = token_result_set[0][0] expire_time = token_result_set[0][1] else: access_token = token_result_set[0][0] expire_time = token_result_set[0][1] expire_time = datetime.datetime.strptime(expire_time, '%y-%m-%d %h:%m:%s.%f') now_time = datetime.datetime.now() if now_time < expire_time: # print "the token is %s" % access_token # print "the token will expire on %s" % expire_time return access_token else: self.__get_token_from_weixin_qy_api() return self.__get_token_from_persistence_storage() @staticmethod def __is_token_expired(): try: token_result_set = sqlite3_get_token() except sqlite3.error as e: print e sys.exit(1) expire_time = token_result_set[0][1] expire_time = datetime.datetime.strptime(expire_time, '%y-%m-%d %h:%m:%s.%f') now_time = datetime.datetime.now() if now_time < expire_time: return false else: return true def get(self): return self.__get_token_from_persistence_storage()
python实现通过微信企业号发送文本消息的class
编程要点和调用方法:
支持发送中文,核心语句“payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=false)”,关键字“python json 中文”
这个class只有一个公共方法send()。
使用方法:import这个class,然后调用send方法即可,方法参数只需要两个,给谁(多userid用"|"隔开),内容是什么,例如:
import odbp_sendmessage msg = odbp_sendmessage.weixinsendmsgclass() msg.send("dingguodong", "python 大魔王!") #!/usr/bin/python # encoding: utf-8 # -*- coding: utf8 -*- """ created by pycharm. file: linuxbashshellscriptforops:odbp_sendmessage.py user: guodong create date: 2016/8/12 create time: 14:49 """ import odbp_gettoken class weixinsendmsgclass(object): def __init__(self): self.access_token = odbp_gettoken.weixintokenclass().get() self.to_user = "" self.to_party = "" self.to_tag = "" self.msg_type = "text" self.agent_id = 2 self.content = "" self.safe = 0 self.data = { "touser": self.to_user, "toparty": self.to_party, "totag": self.to_tag, "msgtype": self.msg_type, "agentid": self.agent_id, "text": { "content": self.content }, "safe": self.safe } def send(self, to_user, content): if to_user is not none and content is not none: self.data['touser'] = to_user self.data['text']['content'] = content else: print raise runtimeerror import requests import json url = "https://qyapi.weixin.qq.com/cgi-bin/message/send" querystring = {"access_token": self.access_token} payload = json.dumps(self.data, encoding='utf-8', ensure_ascii=false) headers = { 'content-type': "application/json", 'cache-control': "no-cache", } response = requests.request("post", url, data=payload, headers=headers, params=querystring) return_content = json.loads(response.content) if return_content["errcode"] == 0 and return_content["errmsg"] == "ok": print "send successfully! %s " % return_content else: print "send failed! %s " % return_content
python调用mongodb发送微信企业号
python2.x
注意:data变量里, agent_id为刚刚创建的应用id(可在web页面看到)
toparty即为目标部门,或者可以用touser,totag指定目标账户
比较简单的调用,已实测,可以使用。
#coding:utf-8 import sys import requests import json from pymongo import mongoclient reload(sys) sys.setdefaultencoding('utf-8') class weixin(object): def __init__(self, corp_id, corp_secret): self.token_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s' %(corp_id, corp_secret) self.send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' def get_token(self): try: r = requests.get(self.token_url, timeout=10) except exception as e: print e sys.exit(1) if r.status_code == requests.codes.ok: data = r.json() if data.get('errcode'): print data['errmsg'] sys.exit(1) return data['access_token'] else: print r.status_code sys.exit(1) def send(self,message): url = self.send_url + self.get_token() data = { "touser": "hequan2011", "msgtype": "text", "agentid": "0", "text": { "content": message }, "safe":"0" } send_data = json.dumps(data,ensure_ascii=false) try: r = requests.post(url, send_data) except exception, e: print e sys.exit(1) if r.status_code == requests.codes.ok: print r.json() else: print r.code sys.exit(1) corpid = 'xxxxxxxxxxx' corpsecret = 'xxxxxxxxxxxxxxxxx' client = mongoclient('mongodb://user:password@127.0.0.1:27017/') db = client.ku collection = db.biao a = [] for data in collection.find(): a.append(data) l = a[0] g = l z = str(g["name"]) z1 = int(g["jg"]) print z msg = "1:{0}\n 2:{1}\n".format(z,z1) w = weixin(corpid,corpsecret) w.send(msg)
zabbix 微信报警 插件
#!/usr/bin/env python # -*- coding:utf-8 -*- # __author__ = '懒懒的天空' import requests import sys import json from conf.inifiles import read_config, write_config import os import datetime from conf.blog import log reload(sys) sys.setdefaultencoding('utf-8') class weixin(object): def __init__(self, corpid, corpsecret): # 初始化的时候需要获取corpid和corpsecret,需要从管理后台获取 self.__params = { 'corpid': corpid, 'corpsecret': corpsecret } self.url_get_token = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken' self.url_send = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?' self.__token = self.__get_token() self.__token_params = { 'access_token': self.__token } def __raise_error(self, res): raise exception('error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg'])) global senderr sendstatus = false senderr = 'error code: %s,error message: %s' % (res.json()['errcode'], res.json()['errmsg']) def __get_token(self): # print self.url_get_token headers = {'content-type': 'application/json'} res = requests.get(self.url_get_token, headers=headers, params=self.__params) try: return res.json()['access_token'] except: self.__raise_error(res.content) def send_message(self, agentid, messages, userid='', toparty=''): payload = { 'touser': userid, 'toparty': toparty, 'agentid': agentid, "msgtype": "news", "news": messages } headers = {'content-type': 'application/json'} data = json.dumps(payload, ensure_ascii=false).encode('utf-8') params = self.__token_params res = requests.post(self.url_send, headers=headers, params=params, data=data) try: return res.json() except: self.__raise_error(res) def main(send_to, subject, content): try: global sendstatus global senderr data = '' messages = {} body = {} config_file_path = get_path() corpid = read_config(config_file_path, 'wei', "corpid") corpsecret = read_config(config_file_path, 'wei', "corpsecret") agentid = read_config(config_file_path, 'wei', "agentid") web = read_config(config_file_path, 'wei', "web") content = json.loads(content) messages["message_url"] = web body["url"] = web + "history.php?action=showgraph&itemids[]=" + content[u'监控id'] warn_message = '' if content[u'当前状态'] == 'problem': body["title"] = "服务器故障" warn_message += subject + '\n' warn_message += '详情:\n' warn_message += '告警等级:'+ content[u'告警等级'] + '\n' warn_message += '告警时间:'+ content[u'告警时间'] + '\n' warn_message += '告警地址:'+ content[u'告警地址'] + '\n' warn_message += '持续时间:'+ content[u'持续时间'] + '\n' warn_message += '监控项目:'+ content[u'监控项目'] + '\n' warn_message += content[u'告警主机'] + '故障(' + content[u'事件id']+ ')' else: body["title"] = "服务器恢复" warn_message += subject + '\n' warn_message += '详情:\n' warn_message += '告警等级:'+ content[u'告警等级'] + '\n' warn_message += '恢复时间:'+ content[u'恢复时间'] + '\n' warn_message += '告警地址:'+ content[u'告警地址'] + '\n' warn_message += '持续时间:'+ content[u'持续时间'] + '\n' warn_message += '监控项目:'+ content[u'监控项目'] + '\n' warn_message += content[u'告警主机'] + '恢复(' + content[u'事件id']+ ')' body['description'] = warn_message data = [] data.append(body) messages['articles'] = data wx = weixin(corpid, corpsecret) data = wx.send_message(toparty=send_to, agentid=agentid, messages=messages) sendstatus = true except exception, e: senderr = str(e) sendstatus = false logwrite(sendstatus, data) def get_path(): path = os.path.dirname(os.path.abspath(sys.argv[0])) config_path = path + '/config.ini' return config_path def logwrite(sendstatus, content): logpath = '/var/log/zabbix/weixin' if not sendstatus: content = senderr t = datetime.datetime.now() daytime = t.strftime('%y-%m-%d') daylogfile = logpath+'/'+str(daytime)+'.log' logger = log(daylogfile, level="info", is_console=false, mbs=5, count=5) os.system('chown zabbix.zabbix {0}'.format(daylogfile)) logger.info(content) if __name__ == "__main__": if len(sys.argv) > 1: send_to = sys.argv[1] subject = sys.argv[2] content = sys.argv[3] logwrite(true, content) main(send_to, subject, content)
python实现微信企业号的文本消息推送
#!/usr/bin/python # _*_coding:utf-8 _*_ import urllib2 import json import sys reload(sys) sys.setdefaultencoding('utf-8') def gettoken(corpid, corpsecret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret try: token_file = urllib2.urlopen(gettoken_url) except urllib2.httperror as e: print e.code print e.read().decode("utf8") sys.exit() token_data = token_file.read().decode('utf-8') token_json = json.loads(token_data) token_json.keys() token = token_json['access_token'] return token def senddata(access_token, user, party, agent, subject, content): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = "{\"touser\":\"" + user + "\",\"toparty\":\"" + party + "\",\"totag\":\"\",\"msgtype\":\"text\",\"agentid\":\"" + agent + "\",\"text\":{\"content\":\"" + subject + "\n" + content + "\"},\"safe\":\"0\"}" send_request = urllib2.request(send_url, send_values) response = json.loads(urllib2.urlopen(send_request).read()) print str(response) if __name__ == '__main__': user = str(sys.argv[1]) # 参数1:发送给用户的账号,必须关注企业号,并对企业号有发消息权限 party = str(sys.argv[2]) # 参数2:发送给组的id号,必须对企业号有权限 agent = str(sys.argv[3]) # 参数3:企业号中的应用id subject = str(sys.argv[4]) # 参数4:标题【消息内容的一部分】 content = str(sys.argv[5]) # 参数5:文本具体内容 corpid = 'corpid' # corpid是企业号的标识 corpsecret = 'corpsecretsecret' # corpsecretsecret是管理组凭证密钥 try: accesstoken = gettoken(corpid, corpsecret) senddata(accesstoken, user, party, agent, subject, content) except exception, e: print str(e) + "error please check \"corpid\" or \"corpsecret\" config"
nagios调用python程序控制微信公众平台发布报警信息
vim notify-host-by-weixin-party.py import urllib.request import json import sys #以上是导入模块 #创建获取accesstoken的方法 def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try: token_file = urllib.request.urlopen(gettoken_url) except urllib.error.httperror as e: print(e.code) print(e.read().decode("utf8")) token_data = token_file.read().decode('utf-8') token_json = json.loads(token_data) token_json.keys() token = token_json['access_token'] return token #这里是发送消息的方法 def senddata(access_token,notify_str): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token #我传入的参数是一段字符串每个信息用separator连起来,只要再用字符串的split("separator")方法分开信息就可以了。 notifydata = notify_str.split("separator") party = notifydata[0] cationtype = notifydata[1] name = notifydata[2] state = notifydata[3] address = notifydata[4] output = notifydata[5] datatime = notifydata[6] # content = '[擦汗]host notification[擦汗]\n\n类型: ' + cationtype + '\n主机名: ' + name + '\n状态: ' + state + '\nip地址: ' + address + '\n摘要: ' + output + '\n时间: ' + datatime + '\n' if cationtype == "recovery": content = '[嘘]' + address + ' is ' + state + '[嘘]\n\nip地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n' else: content = '[擦汗]' + address + ' is ' + state + '[擦汗]\n\nip地址: ' + address + '\n主要用途: ' + name + '\n当前状态: ' + state + '\n\n日志摘要: ' + output + '\n检测时间: ' + datatime + '\n' send_values = { "toparty":party, "totag":"2", "msgtype":"text", "agentid":"15", "text":{ "content":content }, "safe":"0" } send_data = json.dumps(send_values, ensure_ascii=false).encode(encoding='utf8') #设置为非ascii解析,使其支持中文 send_request = urllib.request.request(send_url, send_data) response = urllib.request.urlopen(send_request) #这个是返回微信公共平台的信息,调试时比较有用 msg = response.read() return msg default_encoding = 'utf-8' if sys.getdefaultencoding() != default_encoding: reload(sys) sys.setdefaultencoding(default_encoding) #我编辑的脚本是要获取nagios传入的一段参数的(字符串),下面这条代码是获取执行脚本后获取的第一个参数(经测试nagios只能传入一个参进python,所以把所有包括用户名跟报警主机报警信息放进一个字符串里) notifystr = str(sys.argv[1]) corpid = 'wxb6162862801114c9da602' corpsecret = '2ncsnchxepbcv4u9lcf-23by1rgzu1zs422tdjpktqzqjq1b26ifxp76ydg2rkkchgn6e' accesstoken = gettoken(corpid,corpsecret) msg = senddata(accesstoken,notifystr) print(msg) [root@localhost python]# vim notify-service-by-weixin-party.py import urllib.request import json import sys def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try: token_file = urllib.request.urlopen(gettoken_url) except urllib.error.httperror as e: print(e.code) print(e.read().decode("utf8")) token_data = token_file.read().decode('utf-8') token_json = json.loads(token_data) token_json.keys() token = token_json['access_token'] return token def senddata(access_token,notify_str): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token notifydata = notify_str.split("separator") party = notifydata[0] cationtype = notifydata[1] desc = notifydata[2] alias = notifydata[3] address = notifydata[4] state = notifydata[5] datatime = notifydata[6] output = notifydata[7] # content ='[擦汗]service notification [擦汗]\n\n类型: ' + cationtype + '\n\n服务名: ' + desc + '\n主机名: ' + alias + '\nip址: ' + address + '\n状态: ' + state + '\n时间: ' + datatime + '\n摘要:\n' + output + '\n' if cationtype == "recovery": content ='[鼓掌]' + desc + ' is ' + state + '[鼓掌]\n\nip地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n' else: content ='[擦汗]' + desc + ' is ' + state + '[擦汗]\n\nip地址: ' + address + '\n主要用途: ' + alias + '\n服务状态: ' + desc + ' is ' + state + '\n检测时间: ' + datatime + '\n日志摘要: \n' + output + '\n' send_values = { "toparty":party, "totag":"2", "msgtype":"text", "agentid":"15", "text":{ "content":content }, "safe":"0" } send_data = json.dumps(send_values, ensure_ascii=false).encode(encoding='utf8') send_request = urllib.request.request(send_url, send_data) response = urllib.request.urlopen(send_request) msg = response.read() return msg default_encoding = 'utf-8' if sys.getdefaultencoding() != default_encoding: reload(sys) sys.setdefaultencoding(default_encoding) notifystr = str(sys.argv[1]) corpid = 'wxb616286d28ds01114c9da602' corpsecret = '2ncsnchxepbcdtgv4u9lcf-23by1rgzugh1zs422tdjpktqzqjq1b26ifxp76ydg2rkkchgn6e' accesstoken = gettoken(corpid,corpsecret) msg = senddata(accesstoken,notifystr) print(msg)
shell和python调用企业微信服务号进行报警
#!/bin/bash corpid="wxd6b3" corpsecret="ajtapagjw" access_token=`curl -s "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpid&corpsecret=$corpsecret" |jq '.access_token' | awk -f'"' '{print $2}'` curl -l -h "content-type: application/json" -x post -d '{"touser":"@all","msgtype":"text","toparty":"14","agentid":"14","text":{"content":"测试"} , "safe":"0"}' "
python脚本如下:
# coding:utf-8 import sys import urllib2 import time import json import requests reload(sys) sys.setdefaultencoding('utf-8') #title = sys.argv[2] # 位置参数获取title 适用于zabbix #content = sys.argv[3] # 位置参数获取content 适用于zabbix title = "title 测试" # 位置参数获取title 适用于zabbix content = "content 测试" # 位置参数获取content 适用于zabbix class token(object): # 获取token def __init__(self, corpid, corpsecret): self.baseurl = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={0}&corpsecret={1}'.format( corpid, corpsecret) self.expire_time = sys.maxint def get_token(self): if self.expire_time > time.time(): request = urllib2.request(self.baseurl) response = urllib2.urlopen(request) ret = response.read().strip() ret = json.loads(ret) if 'errcode' in ret.keys(): print >> ret['errmsg'], sys.stderr sys.exit(1) self.expire_time = time.time() + ret['expires_in'] self.access_token = ret['access_token'] return self.access_token def send_msg(title, content): # 发送消息 corpid = "" # 填写自己应用的 corpsecret = "" # 填写自己应用的 qs_token = token(corpid=corpid, corpsecret=corpsecret).get_token() url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token={0}".format( qs_token) payload = { "touser": "@all", "msgtype": "text", "agentid": "14", "text": { "content": "标题:{0}\n 内容:{1}".format(title, content) }, "safe": "0" } ret = requests.post(url, data=json.dumps(payload, ensure_ascii=false)) print ret.json() if __name__ == '__main__': # print title, content send_msg(title, content)
python利用微信订阅号报警
# coding=utf-8 import urllib import urllib2 import cookielib import json import sys data={'username':'yaokuaile-99', 'pwd':'f4bb2d8abe7a799ad62495a912ae3363', 'imgcode':'', 'f':'json' } cj=cookielib.lwpcookiejar() opener=urllib2.build_opener(urllib2.httpcookieprocessor(cj)) urllib2.install_opener(opener) def gettoken(): headers = {'accept': 'application/json, text/javascript, */*; q=0.01', 'accept-encoding': 'gzip,deflate,sdch', 'accept-language': 'zh-cn,zh;q=0.8', 'connection': 'keep-alive', 'content-type': 'application/x-www-form-urlencoded; charset=utf-8', 'content-length': '74', 'content-type': 'application/x-www-form-urlencoded; charset=utf-8', 'host': 'mp.weixin.qq.com', 'origin': 'https://mp.weixin.qq.com', 'referer': 'https://mp.weixin.qq.com/cgi-bin/loginpage?t=wxm2-login&lang=zh_cn', 'user-agent': 'mozilla/5.0 (windows nt 6.1; wow64) applewebkit/537.36 (khtml, like gecko) chrome/33.0.1750.154 safari/537.36', 'x-requested-with': 'xmlhttprequest', } req = urllib2.request('https://mp.weixin.qq.com/cgi-bin/login?lang=zh_cn',urllib.urlencode(data),headers) ret = urllib2.urlopen(req) retread= ret.read() token = json.loads(retread) token=token['redirect_url'][44:] return token ### send msg def sendweixin(msg,token,tofakeid): msg = msg token = token data1 = {'type':'1','content':msg,'imgcode':'','imgcode':'','tofakeid':tofakeid,'f':'json','token':token,'ajax':'1'} headers = {'accept':'*/*', 'accept-encoding': 'gzip,deflate,sdch', 'accept-language': 'zh-cn,zh;q=0.8', 'connection': 'keep-alive', 'content-type': 'application/x-www-form-urlencoded; charset=utf-8', 'host': 'mp.weixin.qq.com', 'origin': 'https://mp.weixin.qq.com', 'referer': 'https://mp.weixin.qq.com/cgi-bin/singlemsgpage?msgid=&source=&count=20&t=wxm-singlechat&fromfakeid=150890&token=%s&lang=zh_cn', 'user-agent': 'mozilla/5.0 (windows nt 6.1; wow64) applewebkit/537.36 (khtml, like gecko) chrome/33.0.1750.154 safari/537.36', 'x-requested-with':'xmlhttprequest', } req2 = urllib2.request('https://mp.weixin.qq.com/cgi-bin/singlesend?t=ajax-response&f=json&lang=zh_cn',urllib.urlencode(data1),headers) ret2=urllib2.urlopen(req2) if __name__=='__main__': ''' useage: ./send_wx.py msg ''' token = gettoken() msg = sys.argv[1:] msg = '\n'.join(msg) tofakeid = '2443746922' sendweixin(msg, token, tofakeid)
#!/usr/bin/env python #-*- coding:utf-8 -*- import urllib2,json import datetime,time from config import * import sys reload(sys) sys.setdefaultencoding("utf-8") class wechatpush(): def __init__(self,appid,secrect,file_name): # 传入appid self.appid = appid # 传入密码 self.secrect = secrect # 传入记录token和过期时间的文件名 self.file_name=file_name def build_timestamp(self,interval): # 传入时间间隔,得到指定interval后的时间 格式为"2015-07-01 14:41:40" now = datetime.datetime.now() delta = datetime.timedelta(seconds=interval) now_interval=now + delta return now_interval.strftime('%y-%m-%d %h:%m:%s') def check_token_expires(self): # 判断token是否过期 with open(self.file_name,'r') as f: line=f.read() if len(line)>0: expires_time=line.split(",")[1] token=line.split(",")[0] else: return "","true" curr_time=time.strftime('%y-%m-%d %h:%m:%s') # 如果过期返回false if curr_time>expires_time: return token,"false" # 没过期返回true else: return token,"true" def gettoken(self): # 获取accesstoken url = 'https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid='+self.appid + "&secret="+self.secrect try: f = urllib2.urlopen(url) s = f.read() # 读取json数据 j = json.loads(s) j.keys() # 从json中获取token token = j['access_token'] # 从json中获取过期时长 expires_in =j['expires_in'] # 将得到的过期时长减去300秒然后与当前时间做相加计算然后写入到过期文件 write_expires=self.build_timestamp(int(expires_in-300)) content="%s,%s" % (token,write_expires) with open(self.file_name,'w') as f: f.write(content) except exception,e: print e return token def post_data(self,url,para_dct): """触发post请求微信发送最终的模板消息""" para_data = para_dct f = urllib2.urlopen(url,para_data) content = f.read() return content def do_push(self,touser,template_id,url,topcolor,data): '''推送消息 ''' #获取存入到过期文件中的token,同时判断是否过期 token,if_token_expires=self.check_token_expires() #如果过期了就重新获取token if if_token_expires=="false": token=self.gettoken() # 背景色设置,貌似不生效 if topcolor.strip()=='': topcolor = "#7b68ee" #最红post的求情数据 dict_arr = {'touser': touser, 'template_id':template_id, 'url':url, 'topcolor':topcolor,'data':data} json_template = json.dumps(dict_arr) requst_url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token="+token content = self.post_data(requst_url,json_template) #读取json数据 j = json.loads(content) j.keys() errcode = j['errcode'] errmsg = j['errmsg'] #print errmsg if __name__ == "__main__": def alarm(title,hostname,timestap,level,message,state,tail): """报警函数""" color="#ff0000" data={"first":{"value":title},"keyword1":{"value":hostname,"color":color},"keyword2":{"value":timestap,"color":color},"keyword3":{"value":level,"color":color},"keyword4":{"value":message,"color":color},"keyword5":{"value":state,"color":color},"remark":{"value":tail}} return data def recover(title,message,alarm_time,recover_time,continue_time,tail): """恢复函数""" re_color="#228b22" data={"first":{"value":title},"content":{"value":message,"color":re_color},"occurtime":{"value":alarm_time,"color":re_color},"recovertime":{"value":recover_time,"color":re_color},"lasttime":{"value":continue_time,"color":re_color},"remark":{"value":tail}} return data # data=alarm("测试的报警消息","8.8.8.8",time.ctime(),"*别","然并卵","挂了","大傻路赶紧处理") # 实例化类 webchart=wechatpush(appid,secrect,file_name) url="http://www.xiaoniu88.com" print len(sys.argv) # 发送报警消息 if len(sys.argv) == 9: title=sys.argv[1] hostname=sys.argv[2] timestap=sys.argv[3] level=sys.argv[4] message=sys.argv[5] state=sys.argv[6] tail=sys.argv[7] print "sys.argv[1]"+sys.argv[1] print "sys.argv[2]"+sys.argv[2] print "sys.argv[3]"+sys.argv[3] print "sys.argv[4]"+sys.argv[4] print "sys.argv[5]"+sys.argv[5] print "sys.argv[6]"+sys.argv[6] print "sys.argv[7]"+sys.argv[7] print "sys.argv[8]"+sys.argv[8] with open("/etc/zabbix/moniter_scripts/test.log",'a+') as f: f.write(title+"\n") f.write(hostname+"\n") f.write(timestap+"\n") f.write(level+"\n") f.write(message+"\n") f.write(state+"\n") f.write(tail+"\n") f.write("%s_%s" % ("group",sys.argv[8])+"\n") data=alarm(title,hostname,timestap,level,message,state,tail) group_name="%s_%s" % ("group",sys.argv[8]) for touser in eval("%s_%s" % ("group",sys.argv[8])): webchart.do_push(touser,alarm_id,url,"",data) for touser in group_super: webchart.do_push(touser,alarm_id,url,"",data) #发送恢复消息 elif len(sys.argv) == 8: title=sys.argv[1] message=sys.argv[2] alarm_time=sys.argv[3] recover_time=sys.argv[4] continue_time=sys.argv[5] tail=sys.argv[6] print "sys.argv[1]"+sys.argv[1] print "sys.argv[2]"+sys.argv[2] print "sys.argv[3]"+sys.argv[3] print "sys.argv[4]"+sys.argv[4] print "sys.argv[5]"+sys.argv[5] print "sys.argv[6]"+sys.argv[6] print "sys.argv[7]"+sys.argv[7] data=recover(title,message,alarm_time,recover_time,continue_time,tail) for touser in eval("%s_%s" % ("group",sys.argv[7])): webchart.do_push(touser,recover_id,url,"",data) for touser in group_super: webchart.do_push(touser,recover_id,url,"",data)
zabbix使用微信报警python脚本
#!/usr/bin/python # coding: utf-8 #python2将zabbix报警信息发送到微信。 #by linwangyi #2016-01-18 import urllib,urllib2 import json import sys def gettoken(corpid,corpsecret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corpid + '&corpsecret=' + corpsecret try: token_file = urllib2.urlopen(gettoken_url) except urllib2.httperror as e: print e.code print e.read().decode("utf8") sys.exit() token_data = token_file.read().decode('utf-8') token_json = json.loads(token_data) token_json.keys() token = token_json['access_token'] return token def senddata(access_token,user,content): send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = { "touser":user, #企业号中的用户帐号,在zabbix用户media中配置,如果配置不正常,将按部门发送。 "toparty":"1", #企业号中的部门id "msgtype":"text", #企业号中的应用id,消息类型。 "agentid":"1", "text":{ "content":content }, "safe":"0" } send_data = json.dumps(send_values, ensure_ascii=false) send_request = urllib2.request(send_url, send_data) response = json.loads(urllib2.urlopen(send_request).read()) print str(response) if __name__ == '__main__': user = str(sys.argv[1]) #zabbix传过来的第一个参数 content = str(sys.argv[3]) #zabbix传过来的第三个参数 corpid = 'xxxx' #corpid是企业号的标识 corpsecret = 'xxxxx' #corpsecretsecret是管理组凭证密钥 accesstoken = gettoken(corpid,corpsecret) senddata(accesstoken,user,content) #!/usr/bin/python3 # coding: utf-8 #python3将zabbix报警信息发送到微信。 #by http://sunday208.blog.51cto.com/ #2016-01-18 import urllib.request import json import sys def gettoken(corp_id,corp_secret): gettoken_url = 'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=' + corp_id + '&corpsecret=' + corp_secret try: token_file = urllib.request.urlopen(gettoken_url) except urllib.error.httperror as e: print(e.code) print(e.read().decode("utf8")) token_data = token_file.read().decode('utf-8') token_json = json.loads(token_data) token_json.keys() token = token_json['access_token'] return token def senddata(access_token,user,content): try: send_url = 'https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=' + access_token send_values = { "touser":user, #企业号中的用户帐号,在zabbix用户media中配置,如果配置不正常,将按部门发送。 "toparty":"1", #企业号中的部门id "msgtype":"text", "agentid":"1", #企业号中的应用id,消息类型。 "text":{ "content":content }, "safe":"0" } send_data = json.dumps(send_values, ensure_ascii=false).encode(encoding='utf8') send_request = urllib.request.request(send_url, send_data) response = urllib.request.urlopen(send_request) msg = response.read() print("returned value : " + str(msg)) except: print("returned value : " + str(msg)) default_encoding = 'utf-8' if sys.getdefaultencoding() != default_encoding: reload(sys) sys.setdefaultencoding(default_encoding) user = str(sys.argv[1]) #zabbix传过来的第一个参数 content = str(sys.argv[3]) #zabbix传过来的第三个参数 corpid = 'xxxx' #corpid是企业号的标识 corpsecret = 'xxxxx' #corpsecretsecret是管理组凭证密钥 accesstoken = gettoken(corpid,corpsecret) senddata(accesstoken,user,content)
总结
以上所述是小编给大家介绍的python利用微信公众号实现报警功能 ,希望对大家有所帮助
上一篇: Python学习小技巧总结