python 阿里云oss实现直传签名与回调验证的示例方法
程序员文章站
2022-04-16 11:56:20
签名import base64import jsonimport timefrom datetime import datetimeimport hmacfrom hashlib import sha...
签名
import base64 import json import time from datetime import datetime import hmac from hashlib import sha1 access_key_id = '' # 请填写您的accesskeysecret。 access_key_secret = '' # host的格式为 bucketname.endpoint ,请替换为您的真实信息。 host = '' # callback_url为 上传回调服务器的url,请将下面的ip和port配置为您自己的真实信息。 callback_url = "" # 用户上传文件时指定的前缀。 upload_dir = 'user-dir-prefix/' expire_time = 1200 expire_syncpoint = int(time.time() + expire_time) policy_dict = { 'expiration': datetime.utcfromtimestamp(expire_syncpoint).isoformat() + 'z', 'conditions': [ {"bucket": "test-paige"}, ['starts-with', '$key', 'user/test/'] ] } policy = json.dumps(policy_dict).strip() policy_encode = base64.b64encode(policy.encode()) signature = base64.encodebytes(hmac.new(access_key_secret.encode(), policy_encode, sha1).digest()) callback_dict = { 'callbackurl': callback_url, 'callbackbody': 'filename=${object}&size=${size}&mimetype=${mimetype}&height=${imageinfo.height}&width=${' 'imageinfo.width}', 'callbackbodytype': 'application/json' } callback = base64.b64encode(json.dumps(callback_dict).strip().encode()).decode() var = { 'accessid': access_key_id, 'host': host, 'policy': policy_encode.decode(), 'signature': signature.decode().strip(), 'expire': expire_syncpoint, 'callback': callback }
回调验签
import asyncio import base64 import time import aiomysql import rsa from aiohttp import web, clientsession from urllib import parse import uuid def success(msg='', data=none): if data is none: data = {} dict_data = { 'code': 1, 'msg': msg, 'data': data } return web.json_response(dict_data) def failed(msg='', data=none): if data is none: data = {} dict_data = { 'code': 0, 'msg': msg, 'data': data } return web.json_response(dict_data) async def handle(request): """ 获取连接池 :param web.baserequest request: :return: """ authorization_base64 = request.headers['authorization'] x_oss_pub_key_url_base64 = request.headers['x-oss-pub-key-url'] pub_key_url = base64.b64decode(x_oss_pub_key_url_base64.encode()) authorization = base64.b64decode(authorization_base64.encode()) path = request.path async with clientsession() as session: async with session.get(pub_key_url.decode()) as resp: pub_key_body = await resp.text() pubkey = rsa.publickey.load_pkcs1_openssl_pem(pub_key_body.encode()) body = await request.content.read() auth_str = parse.unquote(path) + '\n' + body.decode() parse_url = parse.parse_qs(body.decode()) print(parse_url) try: rsa.verify(auth_str.encode(), authorization, pubkey) pool = request.app['mysql_pool'] async with pool.acquire() as conn: async with conn.cursor() as cur: id = str(uuid.uuid4()) url = parse_url['filename'][0] mime = parse_url['mimetype'][0] disk = 'oss' time_at = time.strftime("%y-%m-%d %h:%i:%s", time.localtime()) sql = "insert into media(id,url,mime,disk,created_at,updated_at) values(%s,%s,%s,%s,%s,%s)" await cur.execute(sql, (id, url, mime, disk, time_at, time_at)) await conn.commit() dict_data = { 'id': id, 'url': url, 'cdn_url': 'https://cdn.***.net' + '/' + url, 'mime': mime, 'disk': disk, 'created_at': time_at, 'updated_at': time_at, } return success(data=dict_data) except rsa.pkcs1.verificationerror: return failed(msg='验证错误') async def init(loop): # 创建连接池 mysql_pool = await aiomysql.create_pool(host='127.0.0.1', port=3306, user='', password='', db='', loop=loop) async def on_shutdown(application): """ 接收到关闭信号时,要先关闭连接池,并等待连接池关闭成功. :param web.application application: :return: """ application['mysql_pool'].close() # 没有下面这句话会报错 runtimeerror: event loop is closed ,因为连接池没有真正关关闭程序就关闭了,引发python的报错 await application['mysql_pool'].wait_closed() application = web.application() application.on_shutdown.append(on_shutdown) # 把连接池放到 application 实例中 application['mysql_pool'] = mysql_pool application.add_routes([web.get('/', handle), web.post('/oss', handle)]) return application if __name__ == '__main__': loop = asyncio.get_event_loop() application = loop.run_until_complete(init(loop)) web.run_app(application, host='127.0.0.1') loop.close()
到此这篇关于python 阿里云oss实现直传签名与回调验证的文章就介绍到这了,更多相关python 直传签名与回调验证内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: docker 运行指定内存的操作
下一篇: Vue 重置data的数据为初始状态操作