jira + dingding 任务状态改变发送自定义信息
程序员文章站
2022-03-04 19:02:04
...
Python脚本:
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# --------------------
# File: PMO钉钉发送.py
# Project: 3.各类脚本
# Purpose: xxxxxxx
# Author: Jan Lam ([email protected])
# Last Modified: 2019-01-17 13:10:27
# --------------------
from concurrent.futures import ThreadPoolExecutor
import hug
from requests import post
executor = ThreadPoolExecutor(2)
# config setting
TOKEN_LIST = [
#钉钉机器人:webhook 的 value值,字符串,支持群发
]
JIRA_URL = "" # jira服务器地址
class JiraTask:
def __init__(self, data):
try:
self.issueId = data['issue'].get('key', '')
self.status = data['issue']['fields']['status'].get('name', None) # 任务状态
self.title = data['issue']['fields'].get('summary', None) # 任务标题
self.reporter = data['issue']['fields']['reporter'].get('displayName', None) # 问题报告人
self.created = data['issue']['fields'].get('created', None) # 问题创建时间
self.updated = data['issue']['fields'].get('updated', None) # 问题更新时间
except Exception:
return
try:
self.fromString = data['changelog']['items'][0].get('fromString', None)
self.toString = data['changelog']['items'][0].get('toString', None)
except Exception:
self.fromString = self.toString = None
def _splice_dingtalk_msg(self):
url = f'{JIRA_URL}/browse/{self.issueId}'
cTime = ''
if hasattr(self, 'created'):
cTime = self.created.split('T')[0]
try: # 触发条件:立项评审中 >> 评审中
if hasattr(self, 'fromString') and self.fromString is not None and self.fromString == 'PMO审核' and self.toString == '立项评审中':
msg = f"#### hi, 大家好, {self.reporter}提交的需求:【[{self.issueId} {self.title}]({url})】进入立项评审,请大家关注。 ####"
else:
msg = None
except Exception:
msg = None
return msg
def send_dingtalk(self):
msg = self._splice_dingtalk_msg()
data = {
"msgtype": "markdown",
"markdown": {
"title": f'{self.title}',
"text": f'{msg}',
},
"at": {
"isAtAll": 'True'
}
}
try:
for url in TOKEN_LIST:
post(url, json=data, headers={'Content-Type': 'application/json'}, timeout=20)
return True
except Exception:
return False
@hug.post()
def pmonotify(body):
t = JiraTask(body)
message = t._splice_dingtalk_msg()
try:
if message is not None and message != '':
executor.submit(t.send_dingtalk)
return {'code': 200, 'status': 'knowledge', 'jira_issue_status': t.status, 'message': '已执行发送钉钉信息:' + message}
else:
return {'code': -2, 'status': 'failed', 'jira_issue_status': t.status, 'message': 'message为空'}
except Exception:
return {'code': -1, 'status': 'failed', 'message': '参数不正确!'}
if __name__ == '__main__':
hug.API(__name__).http.serve(port=8890) #python api 端口
上一篇: JavaScript和java一样吗?有什么区别?
下一篇: 数据库之分组