欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python公司内项目对接钉钉审批流程的实现

程序员文章站 2022-03-23 17:33:39
最近把组内的一个项目对接钉钉审批接口,通过python3.6。废话不多说了,上代码:import requestsimport jsonimport timefrom dingtalk.crypto...

最近把组内的一个项目对接钉钉审批接口,通过python3.6。

废话不多说了,上代码:

import requests
import json
import time
from dingtalk.crypto import dingtalkcrypto

from django.conf import settings
# settings.base_dir


class crypto(object):
    def __init__(self, token):
        # 随便填的字符串
        self.token = token
        # 自己生成的43位随机字符串
        self.aes_key = settings.dingding.get("dingtalk_aes_token")
        # 钉钉企业id
        self.corp_id = settings.dingding.get("corpid") #
        print("corp_id:", self.corp_id)
        self.nonce = settings.dingding.get("nonce")
        self.crypto = dingtalkcrypto(
            token=self.nonce,
            encoding_aes_key=self.aes_key,
            corpid_or_suitekey=self.corp_id
        )

    def encrypt_success(self):
        # 返回加密success
        result = self.crypto.encrypt_message(
            msg="success",
            nonce=self.nonce,
            timestamp=int(time.time()*1000)
        )
        return result


class ding(object):
    def __init__(self, approve_process):
        self.agentid = settings.dingding.get("agentid")
        self.appkey = settings.dingding.get("appkey")
        self.appsecret = settings.dingding.get("appsecret")
        self.dingding_url = settings.dingding.get("url")
        self.process_code = settings.dingding.get("approve_process").get(approve_process)['process_code']
        self.aes_key = settings.dingding.get("dingtalk_aes_token")
        self.nonce = settings.dingding.get("nonce")

    def get_token(self):
        '''
        获取钉钉的token
        :return: 钉钉token
        '''
        url = self.dingding_url + '/gettoken?appkey={}&appsecret={}'.format(self.appkey, self.appsecret)
        req = requests.get(url)
        req = json.loads(req.text)
        return req['access_token']

# def createcallbackdd():
#     '''
#     注册钉钉回调函数
#     :return:
#     '''
#     url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token=' + self.gettoken()
#     data = {
#         "call_back_tag": ["bpms_task_change", "bpms_instance_change"],  #这两个回调种类是审批的
#         "token": token,  #自定义的字符串
#         "aes_key": aes_key, #自定义的43位字符串,密钥
#         "url": url  #回调地址
#     }
#     requests.post(url, data=json.dumps(data))
#     return ('ok')

    def create_process(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list, has_cc=0):
        '''
        创建钉钉审批
        approvers为list 元素为钉钉userid   cc_list同理
        '''
        url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
        print("form_component_value_vo:", form_component_value_vo)
        if has_cc == 0:
            data = {
                'agent_id': self.agentid,
                'process_code': self.process_code,  #工单id
                'originator_user_id': originator_user_id,
                'dept_id': dept_id,  #创建人的钉钉部门id
                'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
                'approvers': approvers,
                'cc_list': cc_list,
                'cc_position': 'start_finish'  # 发起和完成时与抄送
            }
        else:
            data = {
                'agent_id': self.agentid,
                'process_code': self.process_code,  #工单id
                'originator_user_id': originator_user_id, #创建人的钉钉userid
                'dept_id': dept_id,  #创建人的钉钉部门id
                'form_component_values': str(form_component_value_vo), #钉钉后台配置的需要填写的字段,
                'approvers': approvers,
            }
        print("dingding_utils:", data)
        response = requests.post(url, data=data)
        return response.json()

    def get_status(self, process_instance_id):
        url = self.dingding_url + '/topapi/processinstance/get?access_token=' + self.get_token()
        data = {
            "process_instance_id": process_instance_id
        }
        response = requests.post(url, data=data)
        return response.json()

    def register_callback(self, call_back_url):
        # 注册回调
        url = self.dingding_url + '/call_back/register_call_back?access_token=' + self.get_token()
        print("self.get_token():", self.get_token())
        data = {
            "call_back_tag": ['bpms_task_change', 'bpms_instance_change'],
            "token": self.nonce,
            "aes_key": self.aes_key,
            "url": call_back_url,
        }
        response = requests.post(url, data=json.dumps(data))
        return response.json()

    def get_callback(self):
        url = self.dingding_url + '/call_back/get_call_back?access_token=' + self.get_token()
        req = requests.get(url)
        req = json.loads(req.text)
        return req

    def create_process_approver_v2(self, originator_user_id, dept_id, form_component_value_vo, approvers, cc_list):
        '''
        创建钉钉审批
        '''
        url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
        data = {
            'agent_id': self.agentid,
            'process_code': self.process_code,
            'originator_user_id': originator_user_id,
            'dept_id': dept_id,
            'form_component_values': str(form_component_value_vo),
            'approvers_v2': json.dumps(approvers)
        }
        if cc_list:
            data['cc_list'] = cc_list
            data['cc_position'] = 'finish'

        response = requests.post(url, data=data)
        return response.json()

    def create_process_approver_v2_test(self, originator_user_id, dept_id, form_component_value_vo):
        '''
        创建钉钉审批
        '''
        url = self.dingding_url + '/topapi/processinstance/create?access_token=' + self.get_token()
        data = {
            'agent_id': self.agentid,
            'process_code': self.process_code,
            'originator_user_id': originator_user_id,
            'dept_id': dept_id,
            'form_component_values': str(form_component_value_vo),
            'approvers_v2': json.dumps([
                {
                    "task_action_type": "none",
                    "user_ids": ["dingding_id"],   # 单独审批人
                },
                {
                    "task_action_type": "or",
                    "user_ids": ["dingding_id1", "dingding_id2"],   # 或签
                },
                {
                    "task_action_type": "and",
                    "user_ids": ["dingding_id1", "dingding_id2"],  # 会签
                }
            ])
        }

        response = requests.post(url, data=data)
        return response.json()


if __name__ == "__main__":

    import django, os, sys
    sys.path.append('xxxxxx')   # 项目路径
    os.environ['django_settings_module'] = 'xx.settings'
    # print("settings.dingding", settings.dingding)
    ding = ding("create_xx")
    # print(ding.get_token())
    # info = [{'name': '单行输入框','value': 'testixxxxxxxx'}]
    # # print(ding.create_process('11', 11, info))
    a = [
        {'name': "输入框1", 'value': "value1"},
        {'name': "输入框2", 'value': "value2"},
    ]
    # print(ding.create_process_test('11', 11, a))
    # print(ding.create_process_approver_v2_test('11', 11, a))
    # print(ding.create_process_test2())
    # print(ding.get_status('xxx'))
    print(ding.get_status('xx'))

    # # 验证  回调
    # a = ding.get_token()
    # print(a)
    # c = crypto(a)
    # print(c.encrypt_success())

    # 注册回调
    # print(ding.register_callback("http://xxxx.vaiwan.com/xxx"))
    # print(ding.get_callback())

说明:

  1 crypto类用于对接钉钉回调用的。一个公司只有一个corpid,并且一个corpid只能注册一个回调地址。我司有公共组注册好了回调。只要接入公司内的回调即可。所以我实际没有使用到crypto。

  2  在钉钉管理后台中创建应用后会有这三个东西:agentid、appkey,appsecret  。在创建钉钉审批流程,可以从审批流程浏览器中获取到approve_process。别忘啦给这个流程审批接口权限。这些官方文档有说。

  3  配置setting变量:

dingding = {
    "agentid": 123,
    "appkey": "xx",
    "appsecret": "xx",
    "url": "https://oapi.dingtalk.com",
    "approve_process": { # process_code
        "create_xx": {
            "process_code": "abc", # 审批流程的id
    },
    "dingtalk_aes_token": "abc",
    "nonce": "abc",
    "corpid": "abc",
}

 4 接口形式创建的审批流程,与钉钉管理后台创建的流程有一些不同:

    1 不能在不同的审批环节设置不同的抄送人

    2 不能审批流程前后有相同的人,不能自动显示成 “自动同意”(管理后台设置成去重后,但是接口指定审批人场景,不支持)

 5 其他如:审批内容、或签,会签代码里都有示例。

到此这篇关于python公司内项目对接钉钉审批流程的实现的文章就介绍到这了,更多相关python对接钉钉审批内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

相关标签: python 钉钉审批