欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Practical Netty (3) 在Netty中使用Protobuf  

程序员文章站 2022-05-10 10:24:31
...

Practical Netty (3) 在Netty中使用Protobuf

1. Netty 核心概念之一:Upstream 与 Downstream

学过 OSI 的 5 层或 TCP/IP 的 7 层网络模型知道,应用层位于传输层之上。所以从传输层的 Socket 读取数据,就是从下层向上层传输,就是 Upstream;反过来向 Socket 写数据,就是从应用层向传输层发送数据,就是 Downstream,即向 Socket 写。

从LocalTime 实例中 ServerBootstrap 是惯常用法,其中要注意的是:

  • bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());

下面是LocalTimeServer的实现:


28  public class LocalTimeServer {
29  
30      private final int port;
31  
32      public LocalTimeServer(int port) {
33          this.port = port;
34      }
35  
36      public void run() {
37          // Configure the server.
38          ServerBootstrap bootstrap = new ServerBootstrap(
39                  new NioServerSocketChannelFactory(
40                          Executors.newCachedThreadPool(),
41                          Executors.newCachedThreadPool()));
42  
43          // Set up the event pipeline factory.
44          bootstrap.setPipelineFactory(new LocalTimeServerPipelineFactory());
45  
46          // Bind and start to accept incoming connections.
47          bootstrap.bind(new InetSocketAddress(port));
48      }
49  
50      public static void main(String[] args) throws Exception {
51          int port;
52          if (args.length > 0) {
53              port = Integer.parseInt(args[0]);
54          } else {
55              port = 8080;
56          }
57          new LocalTimeServer(port).run();
58      }
59  }

LocalTimeServerPipelineFactory的实现如下。在ChannelPipelineFactory的实现类的getPipeline函数中需要创建一个管道(pipeline),然后在管道中按照顺序添加 Upstream Handlers 和 Downstream Handlers。这其中的规则,如果用图表示的话,如下。


                                          I/O Request
                                         via Channel or
                                     ChannelHandlerContext
                                               |
      +----------------------------------------+---------------+
      |                  ChannelPipeline       |               |
      |                                       \|/              |
      |  +----------------------+  +-----------+------------+  |
      |  | Upstream Handler  N  |  | Downstream Handler  1  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      |             |                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler N-1 |  | Downstream Handler  2  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         .               |
      |             .                          .               |
      |     [ sendUpstream() ]        [ sendDownstream() ]     |
      |     [ + INBOUND data ]        [ + OUTBOUND data  ]     |
      |             .                          .               |
      |             .                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler  2  |  | Downstream Handler M-1 |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      |             |                         \|/              |
      |  +----------+-----------+  +-----------+------------+  |
      |  | Upstream Handler  1  |  | Downstream Handler  M  |  |
      |  +----------+-----------+  +-----------+------------+  |
      |            /|\                         |               |
      +-------------+--------------------------+---------------+
                    |                         \|/
      +-------------+--------------------------+---------------+
      |             |                          |               |
      |     [ Socket.read() ]          [ Socket.write() ]      |
      |                                                        |
      |  Netty Internal I/O Threads (Transport Implementation) |
      +--------------------------------------------------------+

2. Netty 核心概念之二:Pipeline 与 Upstream、Downstream 如何组织

Upstream 和 Downstream 都是在 Pipeline 中“流动”的,所以影响 Upstream 和 Downstream 行为的 UpstreamHandler 和 DownstreamHandler,也要被放到 Pipeline 里,吼吼。所以呢,就有如下的代码:


ChannelPipeline p = Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());

则实际的 Upstream 和 Downstream 执行顺序是:

upstream: 1, 2, 5
downstream: 4, 3

当然其中任何一个 Handler 也可以兼有 Upstream 和 Downstream 的功能。下面是 Netty 中使用 Protobuf 的经典方式的实例,即官方的 LocalServer 的用法。


27  public class LocalTimeServerPipelineFactory implements ChannelPipelineFactory {
28  
29      public ChannelPipeline getPipeline() throws Exception {
30          ChannelPipeline p = pipeline();
31          p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
32          p.addLast("protobufDecoder", new ProtobufDecoder(LocalTimeProtocol.Locations.getDefaultInstance()));
33  
34          p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
35          p.addLast("protobufEncoder", new ProtobufEncoder());
36  
37          p.addLast("handler", new LocalTimeServerHandler());
38          return p;
39      }
40  }
  • 首先添加ProtobufVarint32FrameDecoderProtobufDecoder用于解码 Protobuf package,注意顺序,前者是解码标识长度的 32 位数据,后者才是解码包内容。他们都是 Upstream Handlers,顺序为自上而下。
  • 然后添加ProtobufVarint32LengthFieldPrependerProtobufEncoder用于编码 Protobuf package,注意顺序,前者是编码标识长度的 32 位数据,后者才是编码包内容的。他们都是 Downstream Handlers,顺序为自下而上。
  • LocalTimeServerHandler是自定义的,也是一个 Upstream Handler。所以这里 Upstream(从Socket读到数据)的执行顺序为ProtobufVarint32FrameDecoderProtobufDecoderLocalTimeServerHandler,Downstream(向Socket写数据)的执行顺序为protobufEncoderframeEncoder

这里要说的是,Netty 中提供的关于 Protobuf 的类只有这四个:

org.jboss.netty.handler.codec.protobuf
    - ProtobufDecoder
    - ProtobufEncoder
    - ProtobufVariant32FrameDecoder
    - ProtobufVariant32FrameEncoder

3. Netty 中如何处理 Protobuf 数据包

3.1. Netty 官方示例 LocalTime 中的 Protobuf 使用方式一览

那接下来我们就看看LocalTimeServerHandler的实现方式吧。这个类继承自SimpleChannelUpstreamHandler,说明只有从 Socket 接收数据时(Upstream)才会响应这个类的方法。它覆盖了四个方法,如下:


public class LocalTimeServerHandler extends SimpleChannelUpstreamHandler {
    public void handleUpstream(…){…}
    public void messageReceived(…){…}
    public void exceptionCaught(…){…}
    private static String toString(…){…}
}

除了messageReceived之外,其他的都不是重点。看看messageReceived吧。


54      public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
55  
56          Locations locations = (Locations) e.getMessage();
57          long currentTime = System.currentTimeMillis();
58  
59          LocalTimes.Builder builder = LocalTimes.newBuilder();
60          for (Location l: locations.getLocationList()) {
61              TimeZone tz = TimeZone.getTimeZone(
62                      toString(l.getContinent()) + '/' + l.getCity());
63              Calendar calendar = getInstance(tz);
64              calendar.setTimeInMillis(currentTime);
65  
66              builder.addLocalTime(LocalTime.newBuilder().
67                      setYear(calendar.get(YEAR)).
68                      setMonth(calendar.get(MONTH) + 1).
69                      setDayOfMonth(calendar.get(DAY_OF_MONTH)).
70                      setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
71                      setHour(calendar.get(HOUR_OF_DAY)).
72                      setMinute(calendar.get(MINUTE)).
73                      setSecond(calendar.get(SECOND)).build());
74          }
75  
76          e.getChannel().write(builder.build());
77      }

你可能会问这个 Locations 是神马。这是 Protobuf 生成的一个 Java 类,留在后面说,你现在要知道的,就是 Locations 就是一个由 Protobuf 反序列化(Deserialize 或 Unmarshall)之后的东东。

3.2. 如何生成 Protobuf 的 Java 源文件

如果我们不讲讲 Protobuf 生成 Java 类的内容,似乎有点说不过去了,那就说说吧。

下面是一个 Protobuf 的 Protocol 定义文件的内容,文件名为LocalTimeProtocol.proto(关于 Protobuf 的入门实例,可以参考《Google Protobuf——实现跨平台跨语言的序列化/反序列化》)。


package org.jboss.netty.example.localtime;

option optimize_for = SPEED;
enum Continent {
    AFRICA = 0;
    AMERICA = 1;
    ANTARCTICA = 2;
    ARCTIC = 3;
    ASIA = 4;
    ATLANTIC = 5;
    AUSTRALIA = 6;
    EUROPE = 7;
    INDIAN = 8;
    MIDEAST = 9;
    PACIFIC = 10;
}

message Location {
    required Continent continent = 1;
    required string city = 2;
}

message Locations {
    repeated Location location = 1;
}

enum DayOfWeek {
    SUNDAY = 1;
    MONDAY = 2;
    TUESDAY = 3;
    WEDNESDAY = 4;
    THURSDAY = 5;
    FRIDAY = 6;
    SATURDAY = 7;
}

message LocalTime {
    required uint32 year = 1;
    required uint32 month = 2;
    required uint32 dayOfMonth = 4;
    required DayOfWeek dayOfWeek = 5;
    required uint32 hour = 6;
    required uint32 minute = 7;
    required uint32 second = 8;
}

message LocalTimes {
    repeated LocalTime localTime = 1;
}

可能有些朋友还不太了解 Protobuf,所以我在这里细说下。Protobuf 是 Google 开源的一个平台无关、语言无关的结构化数据的序列化与反序列化工具,将上面的内容保存到文件后,在控制台下输入命令(当然你要事先按照 Google Protocol Buffers 或者《Google Protobuf——实现跨平台跨语言的序列化/反序列化》一文的指导安装 Protobuf):


$ protoc LocalTimeProtocol.proto --java_out=.

就可以在当前目录下生成 Java 类了(并且包括相应的目录结构)。在上面的 Protobuf 定义文件中,出现了LocationLocationsLocalTimeLocalTimes这几个message。其中Location的定义如下:


message Location {
    required Continent continent = 1;
    required string city = 2;
}

其中required限定符表示是这个Locationmessage中必须包含的字段。那么再看Locations


message Locations {
    repeated Location location = 1;
}

这里的repeated限定符表示这是一个多元字段,即包含一组内容。后面的Location表明其中的元素是Locationmessage。LocalTimeLocalTimes的关系与其类似。

另外还要注意到的是ContinentDayOfWeek两个枚举类型(enum),它同 C++、Java 中的枚举定义类似(与 C++ 更类似些),这两个枚举类型分别被用在了LocationLocalTime两个 messages 中。

3.3. 如何使用 Protobuf 生成的 Java 类

3.3.1. 获取一个普通消息

  • 上面我们看到了,一个普通的 message,比如Locations,就叫Locations。全称是

      org.jboss.netty.example.localtime.LocalTimeProtocol.Locations
    

SimpleChannelUpstreamHandler.messageReceived()中是如下使用的:

Locations locations = (Locations) e.getMessage(); // e is an instance of MessageEvent

3.3.2. 从消息中 get 一个 required 成员

Location中的ContinentCityrequired字段,他们是最普通不过的,get如下:

l.getContinent()) + '/' + l.getCity()

如果 get 出来的是一个由 Protobuf 消息自身定义的 enum,则其值是定义中的对应的 int 值。

3.3.3. 从消息中 get 一个 repeated 成员

  • message定义中如果有repeated字段,如Locationsrepeated Location location,则这个字段的值的取出方法是:

      Locations locations = (Locations) e.getMessage();
      for (Location l: locations.getLocationList()) {
          …
      }
    

3.3.4. 生成一个普通消息

LocalTime localTime = LocalTime.newBuilder().build();

build()的返回值就是一个 LocalTime 实例。但是这样创建的一个 LocalTime,它的各 required 字段还没有设置,所以请看 3.3.5。

3.3.5. 向消息中 set 一个 required 成员

在 3.3.4. 中创建的一个 LocalTime,可以如下链式操作:

LocalTime.newBuilder().
                setYear(calendar.get(YEAR)).
                setMonth(calendar.get(MONTH) + 1).
                setDayOfMonth(calendar.get(DAY_OF_MONTH)).
                setDayOfWeek(DayOfWeek.valueOf(calendar.get(DAY_OF_WEEK))).
                setHour(calendar.get(HOUR_OF_DAY)).
                setMinute(calendar.get(MINUTE)).
                setSecond(calendar.get(SECOND)).build()
LocalTime localTime = 

newBuilder返回的是一个BuildersetYearsetMonth等等返回的也是一个Builder,这样很方便链式写法。

3.3.5. 向消息中 set 一个 repeated 成员

LocalTimes包含一个 repeated 成员,所以要如下 set:

LocalTimes.Builder builder = LocalTimes.newBuilder();
builder.addLocalTime(localTime1);
builder.addLocalTime(localTime2);
builder.addLocalTime(localTime3);
LocalTimes localTimes = builder.build();

其中localTime1localTime3localTime3都是通过 3.3.4. 中的方式得到的。

-

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

-