欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

jira + dingding 任务状态改变发送自定义信息

程序员文章站 2022-03-04 19:02:04
...

Python脚本:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# --------------------
# File: PMO钉钉发送.py
# Project: 3.各类脚本
# Purpose: xxxxxxx
# Author: Jan Lam ([email protected])
# Last Modified: 2019-01-17 13:10:27
# --------------------
from concurrent.futures import ThreadPoolExecutor
import hug
from requests import post
executor = ThreadPoolExecutor(2)

# config setting
TOKEN_LIST = [
   #钉钉机器人:webhook 的 value值,字符串,支持群发
]
JIRA_URL = ""  # jira服务器地址


class JiraTask:
    def __init__(self, data):
        try:
            self.issueId = data['issue'].get('key', '')
            self.status = data['issue']['fields']['status'].get('name', None)  # 任务状态
            self.title = data['issue']['fields'].get('summary', None)  # 任务标题
            self.reporter = data['issue']['fields']['reporter'].get('displayName', None)  # 问题报告人
            self.created = data['issue']['fields'].get('created', None)  # 问题创建时间
            self.updated = data['issue']['fields'].get('updated', None)  # 问题更新时间
        except Exception:
            return

        try:
            self.fromString = data['changelog']['items'][0].get('fromString', None)
            self.toString = data['changelog']['items'][0].get('toString', None)
        except Exception:
            self.fromString = self.toString = None

    def _splice_dingtalk_msg(self):
        url = f'{JIRA_URL}/browse/{self.issueId}'
        cTime = ''
        if hasattr(self, 'created'):
            cTime = self.created.split('T')[0]

        try:  # 触发条件:立项评审中  >>  评审中
            if hasattr(self, 'fromString') and self.fromString is not None and self.fromString == 'PMO审核' and self.toString == '立项评审中':
                msg = f"#### hi, 大家好, {self.reporter}提交的需求:【[{self.issueId} {self.title}]({url})】进入立项评审,请大家关注。 ####"
            else:
                msg = None
        except Exception:
            msg = None
        return msg

    def send_dingtalk(self):
        msg = self._splice_dingtalk_msg()
        data = {
            "msgtype": "markdown",
            "markdown": {
                "title": f'{self.title}',
                "text": f'{msg}',
            },
            "at": {
                "isAtAll": 'True'
                }
        }
        try:
            for url in TOKEN_LIST:
                post(url, json=data, headers={'Content-Type': 'application/json'}, timeout=20)
            return True
        except Exception:
            return False


@hug.post()
def pmonotify(body):
    t = JiraTask(body)
    message = t._splice_dingtalk_msg()

    try:
        if message is not None and message != '':
            executor.submit(t.send_dingtalk)
            return {'code': 200, 'status': 'knowledge', 'jira_issue_status': t.status, 'message': '已执行发送钉钉信息:' + message}
        else:
            return {'code': -2, 'status': 'failed', 'jira_issue_status': t.status, 'message': 'message为空'}
    except Exception:
        return {'code': -1, 'status': 'failed', 'message': '参数不正确!'}


if __name__ == '__main__':
    hug.API(__name__).http.serve(port=8890)  #python api 端口

相关标签: 工具