server add queue 提升 QPS
程序员文章站
2022-07-13 11:20:02
...
server add queue 提升 QPS
凡是过往,皆为序章
任务场景
提供API接口,同时将API 接收到的数据,推送到Kafka/RabbitMQ。
机器数据:1CPU 2G 内存 1T 机械硬盘
QPS峰值:8000/10000
body大小:0.3K
编程语言:Python3
web框架:Flask
0.1 版本
使用 gevent.monkey + gevent.pywgi,单次请求异步 写入 Kafka
#!/usr/bin/env python
# encoding: utf-8
# 猴子补丁
from gevent import monkey
monkey.patch_all()
import json
import logging
from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer
app = Flask(__name__)
# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)
logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
level=logging.INFO)
logger = logging.getLogger(__name__)
@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
try:
data = json.dumps(request.args.to_dict())
logger.info("data")
# 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
produc.send(topic=topic, value=data)
except Exception as e:
logger.error("/api/v1/show/ err")
pass
return "succ"
if __name__ == '__main__':
# 使用 pywsgi.WSGIServer 异步 方式提高 性能
server = pywsgi.WSGIServer(('', 8015), app)
server.serve_forever()
因为 KafkaProducer 是异步推送相关数据,但是在高并发情况,异步推送数据中的临界条件频繁触发,导致变为 “同步”,例如 限制条件默认是 1000条,
同时QPS为3000,那么就接近同步方式推送。
0.2 版本
在代码中 增加队列 进一步提高 其并发性能
使用 gevent.queue.JoinableQueue 增加一层队列,因为在内存中,同时能够快速返回,整体会有一定的性能提升
#!/usr/bin/env python
# encoding: utf-8
# 猴子补丁
from gevent import monkey
monkey.patch_all()
import json
import gevent
import logging
from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer
from gevent.queue import JoinableQueue
app = Flask(__name__)
REQ_Q = JoinableQueue(100000) # 中间数据缓存
# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)
logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
level=logging.INFO)
logger = logging.getLogger(__name__)
def main_worker():
while True:
try:
item = REQ_Q.get()
produc.send(item)
except Exception as e:
logger.error(f"main_work err {e}")
print(e)
finally:
REQ_Q.task_done()
@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
try:
# 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
data = json.dumps(request.args.to_dict())
logger.info("data")
REQ_Q.put(data)
except Exception as e:
logger.error("/api/v1/show/ err")
pass
return "succ"
def init_workers(size):
for i in range(size):
# 因为协程是 用户态,所以避免了上下文的切换,整体性能会有部分提升
gevent.spawn(main_worker)
if __name__ == '__main__':
# 异步轮训消费
init_workers(50)
# 使用 pywsgi.WSGIServer 异步 方式提高 性能
server = pywsgi.WSGIServer(('', 8015), app)
server.serve_forever()
总结
因为Kafka,已经是异步批量推送,所以,整体的性能还是非常不错的,但是因为有存在突升QPS,有时候还是需要增加一层 Queue 进行进行缓存,
但是会有 OOM 的情况发生,所以这种方式,可以简单的作为一个预备临时方案,
正确方案:应该是能够自动扩容。
上一篇: 通过AWK 查看单机QPS
下一篇: 洛谷 P1116 车厢重组 题解