欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

gRPC-compression压缩的使用

程序员文章站 2022-03-12 21:18:40
...

简介

grpc 为客户端和服务端提供了无损失的压缩方式,从而减少网络传输时的流量。主要有以下三种压缩方式:

  • grpc.Compression.NoCompression: 不进行压缩(默认)
  • grpc.Compression.Deflate: Deflate 算法
  • grpc.Compression.Gzip: Gzip 算法

客户端和服务端默认采用 grpc.Compression.NoCompression 的方式进行压缩。

具体有关 Deflate 和 Gzip 压缩算法的区别可以参看下面的文章:gzip和deflate的区别

client 端压缩

客户端的压缩被分为两个等级:

channel 级别

with grpc.insecure_channel(
	    'foo.bar:1234', 
    	compression=grpc.Compression.Gzip) as channel:
    use_channel(channel)

call level

如果我们在 调用远程方法的时候设置压缩的级别,该压缩级别将覆盖原来的 **channel **级别.

stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'),
                         compression=grpc.Compression.Deflate)

Server 端压缩

和 Client 端一样,服务端的压缩也是被分为了两个级别:

全局参数

server = grpc.server(futures.ThreadPoolExecutor(),
                     compression=grpc.Compression.Gzip)

局部 RPC 的指定

def SayHello(self, request, context):
    context.set_response_compression(grpc.Compression.NoCompression)
    return helloworld_pb2.HelloReply(message='Hello, %s!' 
                                     % request.name)

在局部 RPC 进行压缩参数指定的话,当客户端调用该方法的时候,同样也会覆盖全局参数进行覆盖。

注意:

  • 在创建通道的时候,如果指定了压缩的形式,在调用远程方法的时候,每次调用都会进行压缩。

  • 在进行 RPC 调用的响应时,我们需要注意:

    • 对于 unary RPCs,压缩配置在对应接口里面的 Context 对象当中:
    def SayHello(self, request, context: _Context):
        context.set_compression(grpc.Compression.Deflate)
        return helloworld_pb2.HelloReply(message='Hello, %s!' 
                                         % request.name)
    

    ​ 这时我们就对返回的内容进行了压缩。

    • 如果是通过流式进行传输,此时将禁用压缩。

    这里还有待考证

非对称压缩方式

针对每次传输的信息,我们需要根据实际情况来对数据进行压缩,例如压缩可能会导致更加低效的收益。

如果客户端采用了一种服务端不允许的压缩方式,此时服务端将会抛出UNIMPLEMENTED的异常状态码。服务器服务器将返回一个包含 grpc-accept-encoding的响应来告诉客户端允许哪些压缩方式。

但是此时如果客户端根据给定的响应信息选择能够压缩的方式,此时如果还是抛出UNIMPLEMENTED,那么这个就不是压缩的问题,可能是其它原因导致的 BUG。

当然,如果服务端采用了客户端不能接收的压缩方式,此时客户端也会抛出一个异常:INTERNAL

因此如果采用了对方不接受的压缩方式,而后的数据发送方式就应该采用非压缩的方式。

具体这里如何针对性的禁用指定的压缩方式,这里还并不知晓。

代码部分

这里我们看下例子里面的压缩方式,比较绕:

Server 端

配置部分

import os
import sys
from concurrent import futures
import argparse
import logging
import threading
import grpc
from grpc._server import _Context

PATH = os.path.dirname(
    os.path.dirname(
        os.path.dirname(
            os.path.abspath(__file__))))
sys.path.append(PATH)
from protos import helloworld_pb2
from protos import helloworld_pb2_grpc

_DESCRIPTION = 'A server capable of compression.'
_COMPRESSION_OPTIONS = {
    "none": grpc.Compression.NoCompression,
    "deflate": grpc.Compression.Deflate,
    "gzip": grpc.Compression.Gzip,
}
_LOGGER = logging.getLogger(__name__)

_SERVER_HOST = 'localhost'

实现服务端的接口

class Greeter(helloworld_pb2_grpc.GreeterServicer):

    def __init__(self, no_compress_every_n):
        super(Greeter, self).__init__()
        self._no_compress_every_n = no_compress_every_n
        self._request_counter = 0
        self._counter_lock = threading.RLock()
	# 根据示例属性判断是否进行压缩的方法
    def _should_suppress_compression(self):
        suppress_compression = False
        with self._counter_lock:
            if self._no_compress_every_n and self._request_counter % self._no_compress_every_n == 0:
                suppress_compression = True
            self._request_counter += 1
        return suppress_compression
	# 实现的接口
    def SayHello(self, request, context: _Context):
        if self._should_suppress_compression():
            context.set_compression(grpc.Compression.NoCompression)
        else:
            # 针对这个接口,我们采用 gzip 的压缩方式
            context.set_compression(_COMPRESSION_OPTIONS['gzip'])
        return helloworld_pb2.HelloReply(message='Hello, %s!' 
                                         % request.name)

启动服务器

def run_server(server_compression, no_compress_every_n, port):
    server = grpc.server(futures.ThreadPoolExecutor(),
                         compression=server_compression,
                         options=(('grpc.so_reuseport', 1),))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(
        Greeter(no_compress_every_n), server)
    address = '{}:{}'.format(_SERVER_HOST, port)
    server.add_insecure_port(address)
    server.start()
    print("Server listening at '{}'".format(address))
    server.wait_for_termination()

命令行参数的配置

def main():
    parser = argparse.ArgumentParser(description=_DESCRIPTION)
    parser.add_argument('--server_compression',
                        default='none',
                        nargs='?',
                        choices=_COMPRESSION_OPTIONS.keys(),
                        help='The default compression method for the server.')
    parser.add_argument('--no_compress_every_n',
                        type=int,
                        default=0,
                        nargs='?',
                        help='If set, every nth reply will be uncompressed.')
    parser.add_argument('--port',
                        type=int,
                        default=50051,
                        nargs='?',
                        help='The port on which the server will listen.')
    args = parser.parse_args()
    run_server(_COMPRESSION_OPTIONS[args.server_compression],
               args.no_compress_every_n, args.port)


if __name__ == "__main__":
    logging.basicConfig()
    main()

代码解释

这里我们通过命令行的形式将 字典 _COMPRESSION_OPTIONS 中 key 进行了传递,指定了服务进行压缩的方式。同时在内部实现的接口中,对不同的接口通过指定的方式进行压缩。

Client 端

配置部分

import argparse
import logging
import os
import sys
import grpc

PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(PATH)
from protos import helloworld_pb2
from protos import helloworld_pb2_grpc

_DESCRIPTION = 'A client capable of compression.'
_COMPRESSION_OPTIONS = {
    "none": grpc.Compression.NoCompression,
    "deflate": grpc.Compression.Deflate,
    "gzip": grpc.Compression.Gzip,
}

_LOGGER = logging.getLogger(__name__)

客户端主体代码部分

def run_client(channel_compression, call_compression, target):
    with grpc.insecure_channel(target,
                               compression=channel_compression) as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(
            helloworld_pb2.HelloRequest(name='you'),
            # 这里指定 客户端 传输的方法为压缩的形式
            compression=call_compression,
            wait_for_ready=True)
        print("Response: {}".format(response))

命令行启动

def main():
    parser = argparse.ArgumentParser(description=_DESCRIPTION)
    parser.add_argument('--channel_compression',
                        default='none',
                        nargs='?',
                        choices=_COMPRESSION_OPTIONS.keys(),
             help='The compression method to use for the channel.')
    parser.add_argument(
        '--call_compression',
        default='none',
        nargs='?',
        choices=_COMPRESSION_OPTIONS.keys(),
       help='The compression method to use for an individual call.')
    parser.add_argument('--server',
                        default='localhost:50051',
                        type=str,
                        nargs='?',
                        help='The host-port pair at which to reach the server.')
    args = parser.parse_args()
    channel_compression = \
    	_COMPRESSION_OPTIONS[args.channel_compression]
    call_compression = _COMPRESSION_OPTIONS[args.call_compression]
    run_client(channel_compression, call_compression, args.server)


if __name__ == "__main__":
    logging.basicConfig()
    main()

日常使用

对于项目阶段,一般都会如下进行定义:

# 首先定义一个全局的压缩选项
_COMPRESSION_OPTIONS = {
    "none": grpc.Compression.NoCompression,
    "deflate": grpc.Compression.Deflate,
    "gzip": grpc.Compression.Gzip,
}

....
# 指定服务端的接口进行压缩
class ServerServicer(exercise_pb2_grpc.ServerServicer):
    ...
    def RouteChat(self, request_iterator, context:_Context):
        # 在这里传入配置选项选项
        context.set_compression(_COMPRESSION_OPTIONS['gzip'])
        prev_notes = []
        for new_note in request_iterator:
            for prev_note in prev_notes:
                if prev_note.location == new_note.location:
                    yield prev_note
            prev_notes.append(new_note)
            
# 服务端全局配置
def serve(server_compression):
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         compression=server_compression)
    exercise_pb2_grpc.add_ServerServicer_to_server(
        ServerServicer(), server
    )
    # port:8888
    server.add_secure_port('[::]:' + PORT, server_credentials)
    server.start()
    server.wait_for_termination()

客户端也是类似的:

_COMPRESSION_OPTIONS = {
    "none": grpc.Compression.NoCompression,
    "deflate": grpc.Compression.Deflate,
    "gzip": grpc.Compression.Gzip,
}

# 把压缩的内容丢到 管道里面
with grpc.secure_channel('{}:{}'.format(HOST, PORT),
                         credentials,
                         compression=_COMPRESSION_OPTIONS['deflate']
                        ) as channel:
.
相关标签: grpc python