Django实现支付宝付款和微信支付的示例代码
程序员文章站
2022-05-26 14:49:12
支付宝支付和微信支付是当今互联网产品常用的功能,我使用django rest framework实现了网页上支付宝支付和微信支付的一个通用服务,提供rpc接口给其他服务,包...
支付宝支付和微信支付是当今互联网产品常用的功能,我使用django rest framework实现了网页上支付宝支付和微信支付的一个通用服务,提供rpc接口给其他服务,包括获取支付宝支付页面url的rpc接口、支付宝支付成功异步回调http接口、获取微信支付二维码rpc接口、主动查询微信订单是否支付的rpc接口等。
支付宝网站支付需要蚂蚁金服开放平台账号,创建应用、配置秘钥等步骤请参考:
微信网站支付需要到微信支付官网注册服务商账号,
目录结构如下:
1、models.py
from django.db import models from django.contrib.postgres.fields import arrayfield # create your models here. class basemodel(models.model): """ 基础模型 """ created_time = models.datetimefield(auto_now_add=true, verbose_name="创建时间") updated_time = models.datetimefield(auto_now=true, verbose_name="修改时间") created_by = models.integerfield(verbose_name="创建人id") updated_by = models.integerfield(verbose_name="修改人id") is_active = models.booleanfield(default=true, verbose_name='是否正常') class meta: abstract = true class alipay(basemodel): """ 支付 """ subject = models.charfield(max_length=256, verbose_name="订单标题") out_trade_no = models.charfield(max_length=64, unique=true, verbose_name="唯一订单号") trade_no = models.charfield(default="", max_length=64, verbose_name="支付宝系统中的交易流水号") total_amount = models.decimalfield(max_digits=11, decimal_places=2, verbose_name="订单的资金总额") return_url = models.charfield(max_length=500, verbose_name="支付完成同步跳转地址") notify_url = models.charfield(max_length=500, verbose_name="支付完成异步通知rpc地址") pay_time = models.datetimefield(null=true, blank=true, verbose_name="支付时间") pay_nos = arrayfield(models.charfield(max_length=100), default=[], verbose_name='同一订单的支付id数组') class meta: verbose_name = "阿里支付" verbose_name_plural = verbose_name ordering = ('-created_time',) class wxorder(basemodel): """ 订单 """ body = models.charfield(max_length=256, verbose_name="商品描述") out_trade_no = models.charfield(max_length=64, unique=true, verbose_name="订单号") transaction_id = models.charfield(default="", max_length=64, verbose_name="微信支付订单号") total_fee = models.bigintegerfield(verbose_name="订单的资金总额,单位为分") product_id = models.charfield(max_length=16, verbose_name="商品id") notify_url = models.charfield(max_length=500, verbose_name="支付完成通知url") pay_time = models.datetimefield(null=true, blank=true, verbose_name="支付时间") class meta: verbose_name = "微信订单" verbose_name_plural = verbose_name ordering = ('-created_time',) class wxpay(basemodel): """ 微信支付 """ out_trade_no = models.charfield(null=true, blank=true, max_length=64, verbose_name="订单号") pay_no = models.charfield(null=true, blank=true, max_length=64, unique=true, verbose_name="支付唯一订单号") code_url = models.charfield(null=true, blank=true, max_length=100, verbose_name="二维码地址") nonce_str = models.charfield(null=true, blank=true, max_length=32, verbose_name="随机字符串") class meta: verbose_name = "微信支付" verbose_name_plural = verbose_name ordering = ('-created_time',)
2、serializers.py:
from django.conf import settings from rest_framework import serializers from pay.models import alipay, wxpay class baseserializer(serializers.modelserializer): created_time = serializers.datetimefield(format=settings.datetime_format, read_only=true) updated_time = serializers.datetimefield(format=settings.datetime_format, read_only=true) is_active = serializers.booleanfield(read_only=true) class meta: model = none class alipayserializer(baseserializer): """ 阿里支付序列化类 """ class meta: model = alipay fields = "__all__" class wxpayserializer(baseserializer): """ 阿里支付序列化类 """ class meta: model = wxpay fields = "__all__"
3、views.py:
# -*- coding=utf-8 -*- # create your views here. import time import dicttoxml from jsonrpc import jsonrpc_method from rest_framework.decorators import list_route from rest_framework.response import response from rest_framework.viewsets import modelviewset from rest_framework.views import apiview from rest_framework_xml.parsers import xmlparser from rest_framework_xml.renderers import xmlrenderer from tokenauth.decorators import is_login from pay import utils from pay.weixin_pay import weixinpay, unifiedorderpay, orderquery from pay.uuidtools import uuidtools from pay.models import alipay, wxpay, wxorder from pay.serializers import alipayserializer, wxpayserializer from pay.utils import unactivemodelmixin from pay.alipay import alipay from pay_service.settings.base import appid, private_key_path, \ ali_pub_key_path, alipay_callback_url, \ wxappid, wx_pay_key, wx_mch_id, wxpay_callback_url notify_url = alipay_callback_url + 'api/v1.0/pay/alipay/notify/' class alipayviewset(modelviewset): queryset = alipay.objects.filter(is_active=true) serializer_class = alipayserializer @list_route(methods=['post']) def notify(self, request): """ 处理支付宝的notify_url :param request: :return: """ processed_dict = {} for k, v in request.data.items(): processed_dict[k] = v app_id = processed_dict.get('app_id') pay_no = processed_dict.get('out_trade_no') trade_no = processed_dict.get('trade_no') total_amount = processed_dict.get('total_amount') pay_time = time.strftime('%y-%m-%d %h:%m:%s',time.localtime()) alipay = alipay.objects.filter(pay_nos__contains=[pay_no]).values().first() if alipay is none: return response("failed") if str(alipay.get('total_amount')) != str(total_amount): return response("failed") if app_id != appid: return response("failed") if alipay.get('trade_no') != "": return response("failed") sign = processed_dict.pop('sign', none) ali_pay = alipay( appid=appid, app_notify_url=notify_url, app_private_key_path=private_key_path, alipay_public_key_path=ali_pub_key_path, debug=true, # 默认false, return_url=alipay.get('return_url') ) is_verify = ali_pay.verify(processed_dict, sign) if is_verify is true: alipay.objects.filter(pk=alipay.get('id')).update(pay_time=pay_time, trade_no=trade_no) ret = utils.request_thrift('tradingmanager', 'notify', settings.trading_rpc_ip, int(settings.trading_rpc_port), alipay.get('out_trade_no'), str(pay_time)) if ret == "success": return response("success") class wxpayviewset(modelviewset): queryset = wxpay.objects.filter(is_active=true) serializer_class = wxpayserializer parser_classes = (xmlparser,) renderer_classes = (xmlrenderer,) @jsonrpc_method('pay.get_alipay_url') def get_alipay_url(request, subject, out_trade_no, total_amount, return_url, notify_url, user_id): recode = alipay.objects.filter(out_trade_no=out_trade_no).values().first() if recode is not none: pay_no = uuidtools.datetime_random() alipay = alipay.objects.get(pk=recode.get('id')) alipay.pay_nos.append(pay_no) alipay.save() else: pay_no = out_trade_no alipay.objects.create(subject=subject, out_trade_no=out_trade_no, total_amount=total_amount, return_url=return_url, notify_url=notify_url, pay_nos=[pay_no], created_by=user_id, updated_by=user_id ) ali_pay = alipay( appid=appid, app_notify_url=notify_url, app_private_key_path=private_key_path, alipay_public_key_path=ali_pub_key_path, debug=true, # 默认false, return_url=return_url ) total_amount = "%.2f" % float(total_amount) url = ali_pay.direct_pay( subject=subject, out_trade_no=pay_no, total_amount=total_amount ) # 沙箱环境网关 # alipay_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url) # 正式环境网关 alipay_url = "https://openapi.alipay.com/gateway.do?{data}".format(data=url) return alipay_url @jsonrpc_method('pay.get_wxpay_url') def get_wxpay_url(request, out_trade_no, body, total_fee, notify_url, product_id, user_id): recode = wxorder.objects.filter(out_trade_no=out_trade_no).values().first() if recode is none: wxorder.objects.create( out_trade_no=out_trade_no, body=body, total_fee=total_fee, notify_url=notify_url, product_id=product_id, created_by=user_id, updated_by=user_id ) pay_no = uuidtools.datetime_random() pay = unifiedorderpay(wxappid, wx_mch_id, wx_pay_key) response = pay.post(body, pay_no, total_fee, wxpay_callback_url.split('://')[1].split(':')[0], wx_notify_url) if response and response["return_code"] == "success" and response["result_code"] == "success": wxorder = wxorder.objects.filter(out_trade_no=out_trade_no).values().first() wxpay.objects.create( out_trade_no=out_trade_no, pay_no=pay_no, code_url=response.get('code_url'), nonce_str=response.get('nonce_str'), created_by=user_id, updated_by=user_id ) return response.get('code_url') @jsonrpc_method('pay.wx_order_query') def wx_order_query(request, out_trade_no): wxpays = wxpay.objects.filter(out_trade_no=out_trade_no).values() pay = orderquery(wxappid, wx_mch_id, wx_pay_key) for wxpay in wxpays: response = pay.post(wxpay.get('pay_no')) if response and response["return_code"] == "success" \ and response["result_code"] == "success": trade_state = response["trade_state"] if trade_state == "success": # 支付成功 pay_time = response["time_end"] transaction_id = response["transaction_id"] wxorder.objects.filter(out_trade_no=out_trade_no).update( pay_time=time.strftime("%y-%m-%d %h:%m:%s", time.strptime(pay_time, "%y%m%d%h%m%s")), transaction_id=transaction_id ) return {"success": true, "pay_time": pay_time} return {"success": false}
4、alipay.py:
# -*- coding: utf-8 -*- # pip install pycryptodome from datetime import datetime from crypto.publickey import rsa from crypto.signature import pkcs1_v1_5 from crypto.hash import sha256 from base64 import b64encode, b64decode from urllib.parse import quote_plus from urllib.parse import urlparse, parse_qs from urllib.request import urlopen from base64 import decodebytes, encodebytes import json class alipay(object): """ 支付宝支付接口 """ def __init__(self, appid, app_notify_url, app_private_key_path, alipay_public_key_path, return_url, debug=false): self.appid = appid self.app_notify_url = app_notify_url self.app_private_key_path = app_private_key_path self.app_private_key = none self.return_url = return_url with open(self.app_private_key_path) as fp: self.app_private_key = rsa.importkey(fp.read()) self.alipay_public_key_path = alipay_public_key_path with open(self.alipay_public_key_path) as fp: self.alipay_public_key = rsa.import_key(fp.read()) if debug is true: self.__gateway = "https://openapi.alipaydev.com/gateway.do" else: self.__gateway = "https://openapi.alipay.com/gateway.do" def direct_pay(self, subject, out_trade_no, total_amount, return_url=none, **kwargs): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "fast_instant_trade_pay", # "qr_pay_mode":4 } biz_content.update(kwargs) data = self.build_body("alipay.trade.page.pay", biz_content, self.return_url) return self.sign_data(data) def build_body(self, method, biz_content, return_url=none): data = { "app_id": self.appid, "method": method, "charset": "utf-8", "sign_type": "rsa2", "timestamp": datetime.now().strftime("%y-%m-%d %h:%m:%s"), "version": "1.0", "biz_content": biz_content } if return_url is not none: data["notify_url"] = self.app_notify_url data["return_url"] = self.return_url return data def sign_data(self, data): data.pop("sign", none) # 排序后的字符串 unsigned_items = self.ordered_data(data) unsigned_string = "&".join("{0}={1}".format(k, v) for k, v in unsigned_items) sign = self.sign(unsigned_string.encode("utf-8")) ordered_items = self.ordered_data(data) quoted_string = "&".join("{0}={1}".format(k, quote_plus(v)) for k, v in ordered_items) # 获得最终的订单信息字符串 signed_string = quoted_string + "&sign=" + quote_plus(sign) return signed_string def ordered_data(self, data): complex_keys = [] for key, value in data.items(): if isinstance(value, dict): complex_keys.append(key) # 将字典类型的数据dump出来 for key in complex_keys: data[key] = json.dumps(data[key], separators=(',', ':')) return sorted([(k, v) for k, v in data.items()]) def sign(self, unsigned_string): # 开始计算签名 key = self.app_private_key signer = pkcs1_v1_5.new(key) signature = signer.sign(sha256.new(unsigned_string)) # base64 编码,转换为unicode表示并移除回车 sign = encodebytes(signature).decode("utf8").replace("\n", "") return sign def _verify(self, raw_content, signature): # 开始计算签名 key = self.alipay_public_key signer = pkcs1_v1_5.new(key) digest = sha256.new() digest.update(raw_content.encode("utf8")) if signer.verify(digest, decodebytes(signature.encode("utf8"))): return true return false def verify(self, data, signature): if "sign_type" in data: sign_type = data.pop("sign_type") # 排序后的字符串 unsigned_items = self.ordered_data(data) message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items) return self._verify(message, signature) if __name__ == "__main__": alipay = alipay( appid="2016081500252338", app_notify_url="http://projectsedus.com/", app_private_key_path="keys/private_2048.txt", alipay_public_key_path="keys/alipay_key_2048.txt", # 支付宝的公钥,验证支付宝回传消息使用,不是你自己的公钥, debug=true, # 默认false, return_url="http://192.168.247.129:8000/" ) url = alipay.direct_pay( subject="测试订单", out_trade_no="20170202126666", total_amount=1000 ) re_url = "https://openapi.alipaydev.com/gateway.do?{data}".format(data=url) print(re_url)
5、wxpay.py:
# -*- coding=utf-8 -*- import time import json import hashlib import requests from pay.utils import (smart_str, dict_to_xml, calculate_sign, random_str, post_xml, xml_to_dict, validate_post_xml, format_url) # from local_settings import appid, mch_id, api_key oauth2_authorize_url = "https://open.weixin.qq.com/connect/oauth2/authorize?%s" oauth2_access_token_url = "https://api.weixin.qq.com/sns/oauth2/access_token?%s" class weixinpay(object): def __init__(self, appid, mch_id, api_key): self.appid = appid # 微信公众号身份的唯一标识。审核通过后,在微信发送的邮件中查看 self.mch_id = mch_id # 受理商id,身份标识 self.api_key = api_key # 商户支付密钥key。审核通过后,在微信发送的邮件中查看 self.common_params = { "appid": self.appid, "mch_id": self.mch_id, } self.params = {} self.url = "" self.trade_type = "" def set_params(self, **kwargs): self.params = {} for (k, v) in kwargs.items(): self.params[k] = smart_str(v) self.params["nonce_str"] = random_str(32) if self.trade_type: self.params["trade_type"] = self.trade_type self.params.update(self.common_params) def post_xml(self): sign = calculate_sign(self.params, self.api_key) xml = dict_to_xml(self.params, sign) response = post_xml(self.url, xml) return xml_to_dict(response.text) def valiate_xml(self, xml): return validate_post_xml(xml, self.appid, self.mch_id, self.api_key) def get_error_code_desc(self, error_code): error_desc = { "systemerror": u"接口后台错误", "invalid_transactionid": u"无效 transaction_id", "param_error": u"提交参数错误", "orderpaid": u"订单已支付", "out_trade_no_used": u"商户订单号重复", "noauth": u"商户无权限", "notenough": u"余额丌足", "notsuportcard": u"不支持卡类型", "orderclosed": u"订单已关闭", "bankerror": u"银行系统异常", "refund_fee_invalid": u"退款金额大亍支付金额", "ordernotexist": u"订单不存在", } return error_desc.get(error_code.strip().upper(), u"未知错误") class unifiedorderpay(weixinpay): """发送预支付单""" def __init__(self, appid, mch_id, api_key): super(unifiedorderpay, self).__init__(appid, mch_id, api_key) self.url = "https://api.mch.weixin.qq.com/pay/unifiedorder" self.trade_type = "native" def post(self, body, out_trade_no, total_fee, spbill_create_ip, notify_url, **kwargs): tmp_kwargs = { "body": body, "out_trade_no": out_trade_no, "total_fee": total_fee, "spbill_create_ip": spbill_create_ip, "notify_url": notify_url, } tmp_kwargs.update(**kwargs) self.set_params(**tmp_kwargs) return self.post_xml()[1] class orderquery(weixinpay): """订单状态查询""" def __init__(self, appid, mch_id, api_key): super(orderquery, self).__init__(appid, mch_id, api_key) self.url = "https://api.mch.weixin.qq.com/pay/orderquery" def post(self, out_trade_no): self.set_params(out_trade_no=out_trade_no) return self.post_xml()[1] class jsapiorderpay(unifiedorderpay): """h5页面的js调用类""" def __init__(self, appid, mch_id, api_key, app_secret): super(jsapiorderpay, self).__init__(appid, mch_id, api_key) self.app_secret = app_secret self.trade_type = "jsapi" def create_oauth_url_for_code(self, redirect_uri): url_params = { "appid": self.appid, "redirect_uri": redirect_uri, # 一般是回调当前页面 "response_type": "code", "scope": "snsapi_base", "state": "state#wechat_redirect" } url = format_url(url_params) return oauth2_authorize_url % url def _create_oauth_url_for_openid(self, code): url_params = { "appid": self.appid, "secret": self.app_secret, "code": code, "grant_type": "authorization_code", } url = format_url(url_params) return oauth2_access_token_url % url def _get_oauth_info(self, code): """ 获取oauth2的信息:access_token、expires_in、refresh_token、openid、scope 返回结果为字典,可使用["xxx"]或.get("xxx", none)的方式进行读取 """ url = self._create_oauth_url_for_openid(code) response = requests.get(url) return response.json() if response else none def _get_openid(self, code): oauth_info = self._get_oauth_info(code) if oauth_info: return oauth_info.get("openid", none) return none def _get_json_js_api_params(self, prepay_id): js_params = { "appid": self.appid, "timestamp": "%d" % time.time(), "noncestr": random_str(32), "package": "prepay_id=%s" % prepay_id, "signtype": "md5", } js_params["paysign"] = calculate_sign(js_params, self.api_key) return js_params def post(self, body, out_trade_no, total_fee, spbill_create_ip, notify_url, code): if code: open_id = self._get_openid(code) if open_id: # 直接调用基类的post方法查询prepay_id,如果成功,返回一个字典 unified_order = super(jsapiorderpay, self).post(body, out_trade_no, total_fee, spbill_create_ip, notify_url, open_id=open_id) if unified_order: prepay_id = unified_order.get("prepay_id", none) if prepay_id: return self._get_json_js_api_params(prepay_id) return none
6、utils.py:
# -*- coding=utf-8 -*- import hashlib import re import types from random import random import requests import thriftpy from django.conf import settings from django.core.exceptions import fielddoesnotexist from django.db import models from django.db.models.fields.reverse_related import foreignobjectrel from rest_framework.pagination import pagenumberpagination from thriftpy.rpc import make_client from pay.exception_handler import foreignobjectreldeleteerror, modeldonthaveisactivefiled, logger def smart_str(s, encoding='utf-8', strings_only=false, errors='strict'): """ returns a bytestring version of 's', encoded as specified in 'encoding'. if strings_only is true, don't convert (some) non-string-like objects. """ if strings_only and isinstance(s, (types.nonetype, int)): return s if not isinstance(s, str): try: return str(s) except unicodeencodeerror: if isinstance(s, exception): # an exception subclass containing non-ascii data that doesn't # know how to print itself properly. we shouldn't raise a # further exception. return ' '.join([smart_str(arg, encoding, strings_only, errors) for arg in s]) return unicode(s).encode(encoding, errors) elif s and encoding != 'utf-8': return s.decode('utf-8', errors).encode(encoding, errors) else: return s def format_url(params, api_key=none): url = "&".join(['%s=%s' % (key, smart_str(params[key])) for key in sorted(params)]) if api_key: url = '%s&key=%s' % (url, api_key) return url def calculate_sign(params, api_key): # 签名步骤一:按字典序排序参数, 在string后加入key url = format_url(params, api_key) # 签名步骤二:md5加密, 所有字符转为大写 return hashlib.md5(url.encode('utf-8')).hexdigest().upper() def dict_to_xml(params, sign): xml = ["<xml>", ] for (k, v) in params.items(): if (v.isdigit()): xml.append('<%s>%s</%s>' % (k, v, k)) else: xml.append('<%s><![cdata[%s]]></%s>' % (k, v, k)) xml.append('<sign><![cdata[%s]]></sign></xml>' % sign) return ''.join(xml) def xml_to_dict(xml): if xml[0:5].upper() != "<xml>" and xml[-6].upper() != "</xml>": return none, none result = {} sign = none content = ''.join(xml[5:-6].strip().split('\n')) pattern = re.compile(r"<(?p<key>.+)>(?p<value>.+)</(?p=key)>") m = pattern.match(content) while (m): key = m.group("key").strip() value = m.group("value").strip() if value != "<![cdata[]]>": pattern_inner = re.compile(r"<!\[cdata\[(?p<inner_val>.+)\]\]>") inner_m = pattern_inner.match(value) if inner_m: value = inner_m.group("inner_val").strip() if key == "sign": sign = value else: result[key] = value next_index = m.end("value") + len(key) + 3 if next_index >= len(content): break content = content[next_index:] m = pattern.match(content) return sign, result def validate_post_xml(xml, appid, mch_id, api_key): sign, params = xml_to_dict(xml) if (not sign) or (not params): return none remote_sign = calculate_sign(params, api_key) if sign != remote_sign: return none if params["appid"] != appid or params["mch_id"] != mch_id: return none return params def random_str(randomlength=8): chars = 'abcdefghijklmnopqrstuvwxyz0123456789' random = random() return "".join([chars[random.randint(0, len(chars) - 1)] for i in range(randomlength)]) def post_xml(url, xml): return requests.post(url, data=xml.encode('utf-8'), verify=false) class unactivemodelmixin(object): """ 删除一个对象,并不真删除,级联将对应外键对象的is_active设置为false,需要外键对象都有is_active字段. """ def perform_destroy(self, instance): rel_fileds = [f for f in instance._meta.get_fields() if isinstance(f, foreignobjectrel)] links = [f.get_accessor_name() for f in rel_fileds] for link in links: manager = getattr(instance, link, none) if not manager: continue if isinstance(manager, models.model): if hasattr(manager, 'is_active') and manager.is_active: manager.is_active = false manager.save() raise foreignobjectreldeleteerror(u'{} 上有关联数据'.format(link)) else: if not manager.count(): continue try: manager.model._meta.get_field('is_active') manager.filter(is_active=true).update(is_active=false) except fielddoesnotexist as ex: # 理论上,级联删除的model上面应该也有is_active字段,否则代码逻辑应该有问题 logger.warn(ex) raise modeldonthaveisactivefiled( '{}.{} 没有is_active字段, 请检查程序逻辑'.format( manager.model.__module__, manager.model.__class__.__name__ )) instance.is_active = false instance.save() def get_queryset(self): return self.queryset.filter(is_active=true) class standardresultssetpagination(pagenumberpagination): page_size_query_param = 'size'
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
上一篇: 剑指前端(前端入门笔记系列)——DOM(属性节点)
下一篇: python实现周期方波信号频谱图