gRPC-compression压缩的使用
简介
grpc 为客户端和服务端提供了无损失的压缩方式,从而减少网络传输时的流量。主要有以下三种压缩方式:
- grpc.Compression.NoCompression: 不进行压缩(默认)
- grpc.Compression.Deflate: Deflate 算法
- grpc.Compression.Gzip: Gzip 算法
客户端和服务端默认采用 grpc.Compression.NoCompression 的方式进行压缩。
具体有关 Deflate 和 Gzip 压缩算法的区别可以参看下面的文章:gzip和deflate的区别。
client 端压缩
客户端的压缩被分为两个等级:
channel 级别
with grpc.insecure_channel(
'foo.bar:1234',
compression=grpc.Compression.Gzip) as channel:
use_channel(channel)
call level
如果我们在 调用远程方法的时候设置压缩的级别,该压缩级别将覆盖原来的 **channel **级别.
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'),
compression=grpc.Compression.Deflate)
Server 端压缩
和 Client 端一样,服务端的压缩也是被分为了两个级别:
全局参数
server = grpc.server(futures.ThreadPoolExecutor(),
compression=grpc.Compression.Gzip)
局部 RPC 的指定
def SayHello(self, request, context):
context.set_response_compression(grpc.Compression.NoCompression)
return helloworld_pb2.HelloReply(message='Hello, %s!'
% request.name)
在局部 RPC 进行压缩参数指定的话,当客户端调用该方法的时候,同样也会覆盖全局参数进行覆盖。
注意:
-
在创建通道的时候,如果指定了压缩的形式,在调用远程方法的时候,每次调用都会进行压缩。
-
在进行 RPC 调用的响应时,我们需要注意:
- 对于 unary RPCs,压缩配置在对应接口里面的 Context 对象当中:
def SayHello(self, request, context: _Context): context.set_compression(grpc.Compression.Deflate) return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
这时我们就对返回的内容进行了压缩。
- 如果是通过流式进行传输,此时将禁用压缩。
这里还有待考证。
非对称压缩方式
针对每次传输的信息,我们需要根据实际情况来对数据进行压缩,例如压缩可能会导致更加低效的收益。
如果客户端采用了一种服务端不允许的压缩方式,此时服务端将会抛出UNIMPLEMENTED
的异常状态码。服务器服务器将返回一个包含 grpc-accept-encoding的响应来告诉客户端允许哪些压缩方式。
但是此时如果客户端根据给定的响应信息选择能够压缩的方式,此时如果还是抛出UNIMPLEMENTED
,那么这个就不是压缩的问题,可能是其它原因导致的 BUG。
当然,如果服务端采用了客户端不能接收的压缩方式,此时客户端也会抛出一个异常:INTERNAL
。
因此如果采用了对方不接受的压缩方式,而后的数据发送方式就应该采用非压缩的方式。
具体这里如何针对性的禁用指定的压缩方式,这里还并不知晓。
代码部分
这里我们看下例子里面的压缩方式,比较绕:
Server 端
配置部分
import os
import sys
from concurrent import futures
import argparse
import logging
import threading
import grpc
from grpc._server import _Context
PATH = os.path.dirname(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))))
sys.path.append(PATH)
from protos import helloworld_pb2
from protos import helloworld_pb2_grpc
_DESCRIPTION = 'A server capable of compression.'
_COMPRESSION_OPTIONS = {
"none": grpc.Compression.NoCompression,
"deflate": grpc.Compression.Deflate,
"gzip": grpc.Compression.Gzip,
}
_LOGGER = logging.getLogger(__name__)
_SERVER_HOST = 'localhost'
实现服务端的接口
class Greeter(helloworld_pb2_grpc.GreeterServicer):
def __init__(self, no_compress_every_n):
super(Greeter, self).__init__()
self._no_compress_every_n = no_compress_every_n
self._request_counter = 0
self._counter_lock = threading.RLock()
# 根据示例属性判断是否进行压缩的方法
def _should_suppress_compression(self):
suppress_compression = False
with self._counter_lock:
if self._no_compress_every_n and self._request_counter % self._no_compress_every_n == 0:
suppress_compression = True
self._request_counter += 1
return suppress_compression
# 实现的接口
def SayHello(self, request, context: _Context):
if self._should_suppress_compression():
context.set_compression(grpc.Compression.NoCompression)
else:
# 针对这个接口,我们采用 gzip 的压缩方式
context.set_compression(_COMPRESSION_OPTIONS['gzip'])
return helloworld_pb2.HelloReply(message='Hello, %s!'
% request.name)
启动服务器
def run_server(server_compression, no_compress_every_n, port):
server = grpc.server(futures.ThreadPoolExecutor(),
compression=server_compression,
options=(('grpc.so_reuseport', 1),))
helloworld_pb2_grpc.add_GreeterServicer_to_server(
Greeter(no_compress_every_n), server)
address = '{}:{}'.format(_SERVER_HOST, port)
server.add_insecure_port(address)
server.start()
print("Server listening at '{}'".format(address))
server.wait_for_termination()
命令行参数的配置
def main():
parser = argparse.ArgumentParser(description=_DESCRIPTION)
parser.add_argument('--server_compression',
default='none',
nargs='?',
choices=_COMPRESSION_OPTIONS.keys(),
help='The default compression method for the server.')
parser.add_argument('--no_compress_every_n',
type=int,
default=0,
nargs='?',
help='If set, every nth reply will be uncompressed.')
parser.add_argument('--port',
type=int,
default=50051,
nargs='?',
help='The port on which the server will listen.')
args = parser.parse_args()
run_server(_COMPRESSION_OPTIONS[args.server_compression],
args.no_compress_every_n, args.port)
if __name__ == "__main__":
logging.basicConfig()
main()
代码解释
这里我们通过命令行的形式将 字典 _COMPRESSION_OPTIONS 中 key 进行了传递,指定了服务进行压缩的方式。同时在内部实现的接口中,对不同的接口通过指定的方式进行压缩。
Client 端
配置部分
import argparse
import logging
import os
import sys
import grpc
PATH = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(PATH)
from protos import helloworld_pb2
from protos import helloworld_pb2_grpc
_DESCRIPTION = 'A client capable of compression.'
_COMPRESSION_OPTIONS = {
"none": grpc.Compression.NoCompression,
"deflate": grpc.Compression.Deflate,
"gzip": grpc.Compression.Gzip,
}
_LOGGER = logging.getLogger(__name__)
客户端主体代码部分
def run_client(channel_compression, call_compression, target):
with grpc.insecure_channel(target,
compression=channel_compression) as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
response = stub.SayHello(
helloworld_pb2.HelloRequest(name='you'),
# 这里指定 客户端 传输的方法为压缩的形式
compression=call_compression,
wait_for_ready=True)
print("Response: {}".format(response))
命令行启动
def main():
parser = argparse.ArgumentParser(description=_DESCRIPTION)
parser.add_argument('--channel_compression',
default='none',
nargs='?',
choices=_COMPRESSION_OPTIONS.keys(),
help='The compression method to use for the channel.')
parser.add_argument(
'--call_compression',
default='none',
nargs='?',
choices=_COMPRESSION_OPTIONS.keys(),
help='The compression method to use for an individual call.')
parser.add_argument('--server',
default='localhost:50051',
type=str,
nargs='?',
help='The host-port pair at which to reach the server.')
args = parser.parse_args()
channel_compression = \
_COMPRESSION_OPTIONS[args.channel_compression]
call_compression = _COMPRESSION_OPTIONS[args.call_compression]
run_client(channel_compression, call_compression, args.server)
if __name__ == "__main__":
logging.basicConfig()
main()
日常使用
对于项目阶段,一般都会如下进行定义:
# 首先定义一个全局的压缩选项
_COMPRESSION_OPTIONS = {
"none": grpc.Compression.NoCompression,
"deflate": grpc.Compression.Deflate,
"gzip": grpc.Compression.Gzip,
}
....
# 指定服务端的接口进行压缩
class ServerServicer(exercise_pb2_grpc.ServerServicer):
...
def RouteChat(self, request_iterator, context:_Context):
# 在这里传入配置选项选项
context.set_compression(_COMPRESSION_OPTIONS['gzip'])
prev_notes = []
for new_note in request_iterator:
for prev_note in prev_notes:
if prev_note.location == new_note.location:
yield prev_note
prev_notes.append(new_note)
# 服务端全局配置
def serve(server_compression):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
compression=server_compression)
exercise_pb2_grpc.add_ServerServicer_to_server(
ServerServicer(), server
)
# port:8888
server.add_secure_port('[::]:' + PORT, server_credentials)
server.start()
server.wait_for_termination()
客户端也是类似的:
_COMPRESSION_OPTIONS = {
"none": grpc.Compression.NoCompression,
"deflate": grpc.Compression.Deflate,
"gzip": grpc.Compression.Gzip,
}
# 把压缩的内容丢到 管道里面
with grpc.secure_channel('{}:{}'.format(HOST, PORT),
credentials,
compression=_COMPRESSION_OPTIONS['deflate']
) as channel:
.
上一篇: confirm方法怎么使用
下一篇: centos7修改主机名