您现在的位置是: 首页  >  IT编程


程序员文章站 2024-03-27 14:38:16
1. 适用版本适用于rpa2020.4以及以上版本,2. 接口 api 手册调用方式及字段,请参考论坛手册:暂未在论坛公开,请向当地厂商技术索取3. python 调用代码如果想用机器人来调 api...

1. 适用版本


2. 接口 api 手册



3. python 调用代码

如果想用机器人来调 api 接口,可参考以下代码,其它语言逻辑一样:

import json

import time

import requests

from urllib import parse

import hmac

import base64

from hashlib import sha256

def rpa_rest_2020_4(host='',rest_type='',data_json=none,mode='',port=443,accesstoken=none,retry=2):










    get_field_json={'code': 40, 'msg': 'fail,意外情况','result':none}


    def json2params(data_json):

        get_field_json={'code': 41, 'msg': 'fail,转换url参数失败!','result':none}



            url_str = ''

            nums = 0

            max_nums = len(data_json)

            for key in data_json:

                nums += 1

                if nums == max_nums:

                    url_str += str(key) + '=' + str(data_json[key])


                    url_str += str(key) + '=' + str(data_json[key]) + '&'

        except exception as e:




            return url_str

    if mode != 'get' and mode != 'put' and mode != 'post' and mode != 'delete':

        get_field_json={'code': 42, 'msg': 'fail,mode错误,只能为get、put、post或delete!','result':none}

        return get_field_json


    sign = ''

    sign_yc = ''

    url = host

    if port != 443:

        url = url + ':' + str(port)

    url += rest_type

    timestamp = none



        get_field_json={'code': 41, 'msg': 'fail,获取签名失败!','result':none}


        timestamp=str(int(round(time.time() * 1000)))


        for key in data_json:





        if mode == 'get':


            sign_yc=json_str + '&token=' + accesstoken + '×tamp=' + timestamp

            url = url+'?'+json_str


            sign_yc=str(data_json) + '&token=' + accesstoken + '×tamp=' + timestamp




        url_bm = parse.quote(sign_yc, encoding="utf-8")


        byte_key = bytes(accesstoken, encoding="utf-8")

        byte_url_bm = bytes(url_bm, encoding="utf-8")

        hn256 = hmac.new(byte_key, byte_url_bm, digestmod=sha256)

        hh256 = hn256.hexdigest()


        bb64 = base64.b64encode(bytes(hh256, encoding="utf-8"))


        sign = str(bb64, "utf-8")


        sign=sign.replace('rn', '')


    except exception as e:


        return get_field_json



    for count in range(retry):





            if mode!='get':

                header_dict = {'content-type': 'application/json;charset=utf-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}

                if mode=='post':

                    if rest_type == '/oapi/v1/job/create':

                        header_dict = {'content-type': 'application/x-www-form-urlencoded;charset=utf-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}


                        url = url+'?'+json_str

                        res = requests.post(url, data=str(data_json), headers=header_dict,verify=false)



                        url = url+'?'+json_str

                        header_dict = {'content-type': 'application/x-www-form-urlencoded;charset=utf-8','accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}

                        res = requests.post(url, data=str(data_json), headers=header_dict,verify=false)

                if mode=='put':

                    res = requests.put(url, data=str(data_json), headers=header_dict,verify=false)

                if mode=='delete':

                    res = requests.delete(url, data=str(data_json), headers=header_dict,verify=false)

            if mode=='get':

                header_dict = {'accesstoken':accesstoken,'timestamp':timestamp,'signature':sign}

                res = requests.get(url,headers=header_dict,verify=false)







        except exception as e:


            get_field_json={'code': 40, 'msg': 'fail,发送api失败!','result':none}

    return get_field_json

def get_token(host='',port='',accesskey='',secretkey='',retry=2):








    get_field_json={'code': 44, 'msg': 'fail,获取token失败','result':none}

    for i in range(retry):


            url = host

            if port != 443:

                url = url + ':' + str(port)


            url = url+'/oapi/v1/token?'+json_str

            res = requests.get(url,verify=false)






        except exception as e:



    return get_field_json        

def refresh_token(host='',port='',refleshtoken='',retry=2):







    get_field_json={'code': 44, 'msg': 'fail,刷新token失败','result':none}

    for i in range(retry):


            url = host

            if port != 443:

                url = url + ':' + str(port)


            url = url+'/oapi/v1/token?'+json_str

            res = requests.get(url,verify=false)






        except exception as e:



    return get_field_json

4. 其它平台或客户端调用

4.1 其它平台调用

按照第 4 章的逻辑自行写调用代码即可。

4.2 机器人调用

按照第 4 章添加一个全局函数,在需要调用的地方使用全局函数控件即可。

