如何用Django处理gzip数据流
程序员文章站
2022-11-17 12:54:52
最近在工作中遇到一个需求,就是要开一个接口来接收供应商推送的数据。项目采用的python的django框架,我是想也没想,就直接一梭哈,写出了如下代码:class xxdatapushview(api...
最近在工作中遇到一个需求,就是要开一个接口来接收供应商推送的数据。项目采用的python的django框架,我是想也没想,就直接一梭哈,写出了如下代码:
class xxdatapushview(apiview): """ 接收xx数据推送 """ # ... @white_list_required def post(self, request, **kwargs): req_data = request.data or {} # ...
但随后,发现每日数据并没有任何变化,质问供应商是否没有做推送,在忽悠我们。然后对方给的答复是,他们推送的是gzip
压缩的数据流,接收端需要主动进行解压。此前从没有处理过这种压缩的数据,对方具体如何做的推送对我来说也是一个黑盒。
因此,我要求对方给一个推送的简单示例,没想到对方不讲武德,仍过来一段没法单独运行的java代码:
private byte[] compress(jsonobject body) { try { bytearrayoutputstream out = new bytearrayoutputstream(); gzipoutputstream gzip = new gzipoutputstream(out); gzip.write(body.tostring().getbytes()); gzip.close(); return out.tobytearray(); } catch (exception e) { logger.error("compress data failed with error: " + e.getmessage()).commit(); } return json.tojsonstring(body).getbytes(); } public void post(jsonobject body, string url, futurecallback<httpresponse> callback) { requestbuilder requestbuilder = requestbuilder.post(url); requestbuilder.addheader("content-type", "application/json; charset=utf-8"); requestbuilder.addheader("content-encoding", "gzip"); byte[] compressdata = compress(body); int timeout = (int) math.max(((float)compressdata.length) / 5000000, 5000); requestconfig.builder requestconfigbuilder = requestconfig.custom(); requestconfigbuilder.setsockettimeout(timeout).setconnecttimeout(timeout); requestbuilder.setentity(new bytearrayentity(compressdata)); requestbuilder.setconfig(requestconfigbuilder.build()); excuterequest(requestbuilder, callback); } private void excuterequest(requestbuilder requestbuilder, futurecallback<httpresponse> callback) { httpurirequest request = requestbuilder.build(); httpclient.execute(request, new futurecallback<httpresponse>() { @override public void completed(httpresponse httpresponse) { try { int responsecode = httpresponse.getstatusline().getstatuscode(); if (callback != null) { if (responsecode == 200) { callback.completed(httpresponse); } else { callback.failed(new exception("status code is not 200")); } } } catch (exception e) { logger.error("get error on " + requestbuilder.getmethod() + " " + requestbuilder.geturi() + ": " + e.getmessage()).commit(); if (callback != null) { callback.failed(e); } } entityutils.consumequietly(httpresponse.getentity()); } @override public void failed(exception e) { logger.error("get error on " + requestbuilder.getmethod() + " " + requestbuilder.geturi() + ": " + e.getmessage()).commit(); if (callback != null) { callback.failed(e); } } @override public void cancelled() { logger.error("request cancelled on " + requestbuilder.getmethod() + " " + requestbuilder.geturi()).commit(); if (callback != null) { callback.cancelled(); } } }); }
从上述代码可以看出,对方将json
数据压缩为了gzip
数据流stream
。于是搜索django
的文档,只有这段关于gzip
处理的装饰器描述:
django.views.decorators.gzip
里的装饰器控制基于每个视图的内容压缩。
- gzip_page()
如果浏览器允许 gzip 压缩,那么这个装饰器将压缩内容。它相应的设置了 vary 头部,这样缓存将基于 accept-encoding 头进行存储。
但是,这个装饰器只是压缩请求响应至浏览器的内容,我们目前的需求是解压缩接收的数据。这不是我们想要的。
幸运的是,在flask
中有一个扩展叫flask-inflate
,安装了此扩展会自动对请求来的数据做解压操作。查看该扩展的具体代码处理:
# flask_inflate.py import gzip from flask import request gzip_content_encoding = 'gzip' class inflate(object): def __init__(self, app=none): if app is not none: self.init_app(app) @staticmethod def init_app(app): app.before_request(_inflate_gzipped_content) def inflate(func): """ a decorator to inflate content of a single view function """ def wrapper(*args, **kwargs): _inflate_gzipped_content() return func(*args, **kwargs) return wrapper def _inflate_gzipped_content(): content_encoding = getattr(request, 'content_encoding', none) if content_encoding != gzip_content_encoding: return # we don't want to read the whole stream at this point. # setting request.environ['wsgi.input'] to the gzipped stream is also not an option because # when the request is not chunked, flask's get_data will return a limited stream containing the gzip stream # and will limit the gzip stream to the compressed length. this is not good, as we want to read the # uncompressed stream, which is obviously longer. request.stream = gzip.gzipfile(fileobj=request.stream)
上述代码的核心是:
request.stream = gzip.gzipfile(fileobj=request.stream)
于是,在django
中可以如下处理:
class xxdatapushview(apiview): """ 接收xx数据推送 """ # ... @white_list_required def post(self, request, **kwargs): content_encoding = request.meta.get("http_content_encoding", "") if content_encoding != "gzip": req_data = request.data or {} else: gzip_f = gzip.gzipfile(fileobj=request.stream) data = gzip_f.read().decode(encoding="utf-8") req_data = json.loads(data) # ... handle req_data
ok, 问题完美解决。还可以用如下方式测试请求:
import gzip import requests import json data = {} data = json.dumps(data).encode("utf-8") data = gzip.compress(data) resp = requests.post("http://localhost:8760/push_data/",data=data,headers={"content-encoding": "gzip", "content-type":"application/json;charset=utf-8"}) print(resp.json())
以上就是如何用django处理gzip数据流的详细内容,更多关于django处理gzip数据流的资料请关注其它相关文章!
下一篇: python多线程爬取西刺代理的示例代码