欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

server add queue 提升 QPS

程序员文章站 2022-07-13 11:20:02
...

server add queue 提升 QPS

server add queue 提升 QPS

凡是过往,皆为序章


任务场景

提供API接口,同时将API 接收到的数据,推送到Kafka/RabbitMQ。
机器数据:1CPU 2G 内存 1T 机械硬盘
QPS峰值:8000/10000
body大小:0.3K
编程语言:Python3
web框架:Flask

0.1 版本

使用 gevent.monkey + gevent.pywgi,单次请求异步 写入 Kafka


#!/usr/bin/env python
# encoding: utf-8


# 猴子补丁
from gevent import monkey

monkey.patch_all()

import json
import logging

from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer

app = Flask(__name__)

# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)

logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
                    level=logging.INFO)

logger = logging.getLogger(__name__)


@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
    try:
        data = json.dumps(request.args.to_dict())
        logger.info("data")
        # 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
        produc.send(topic=topic, value=data)
    except Exception as e:
        logger.error("/api/v1/show/ err")
        pass

    return "succ"


if __name__ == '__main__':
    # 使用 pywsgi.WSGIServer 异步 方式提高 性能
    server = pywsgi.WSGIServer(('', 8015), app)
    server.serve_forever()


因为 KafkaProducer 是异步推送相关数据,但是在高并发情况,异步推送数据中的临界条件频繁触发,导致变为 “同步”,例如 限制条件默认是 1000条,
同时QPS为3000,那么就接近同步方式推送。


0.2 版本

在代码中 增加队列 进一步提高 其并发性能

使用 gevent.queue.JoinableQueue 增加一层队列,因为在内存中,同时能够快速返回,整体会有一定的性能提升


#!/usr/bin/env python
# encoding: utf-8


# 猴子补丁
from gevent import monkey

monkey.patch_all()

import json
import gevent
import logging

from flask import Flask
from flask import request
from gevent import pywsgi
from kafka import KafkaProducer
from gevent.queue import JoinableQueue

app = Flask(__name__)

REQ_Q = JoinableQueue(100000)  # 中间数据缓存

# kafka 生产者 配置
topic = "test"
host = ["127.0.0.1:8009"]
produc = KafkaProducer(hosts=host)

logging.basicConfig(filename=__name__, filemode="w", format="%(asctime)s-%(name)s-%(levelname)s-%(message)s",
                    level=logging.INFO)

logger = logging.getLogger(__name__)


def main_worker():
    while True:
        try:
            item = REQ_Q.get()
            produc.send(item)
        except Exception as e:
            logger.error(f"main_work err {e}")
            print(e)
        finally:
            REQ_Q.task_done()


@app.route("/api/v1/show/", methods=["GET"])
def api_v1_show():
    try:
        # 因为 Kafka 使用的是,异步批量写入 Kafka 中,所以需要
        data = json.dumps(request.args.to_dict())
        logger.info("data")
        REQ_Q.put(data)
    except Exception as e:
        logger.error("/api/v1/show/ err")
        pass

    return "succ"


def init_workers(size):
    for i in range(size):
        # 因为协程是 用户态,所以避免了上下文的切换,整体性能会有部分提升
        gevent.spawn(main_worker)


if __name__ == '__main__':
    # 异步轮训消费
    init_workers(50)
    # 使用 pywsgi.WSGIServer 异步 方式提高 性能
    server = pywsgi.WSGIServer(('', 8015), app)
    server.serve_forever()


总结

因为Kafka,已经是异步批量推送,所以,整体的性能还是非常不错的,但是因为有存在突升QPS,有时候还是需要增加一层 Queue 进行进行缓存,
但是会有 OOM 的情况发生,所以这种方式,可以简单的作为一个预备临时方案
正确方案:应该是能够自动扩容