欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java利用Sping框架编写RPC远程过程调用服务的教程

程序员文章站 2024-03-12 21:46:32
rpc,即 remote procedure call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。 rpc 可基于 http 或 t...

rpc,即 remote procedure call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。

rpc 可基于 http 或 tcp 协议,web service 就是基于 http 协议的 rpc,它具有良好的跨平台性,但其性能却不如基于 tcp 协议的 rpc。会两方面会直接影响 rpc 的性能,一是传输方式,二是序列化。

众所周知,tcp 是传输层协议,http 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,tcp 一定比 http 快。就序列化而言,java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:protobuf、kryo、hessian、jackson 等,它们可以取代 java 默认的序列化,从而提供更高效的性能。

为了支持高并发,传统的阻塞式 io 显然不太合适,因此我们需要异步的 io,即 nio。java 提供了 nio 的解决方案,java 7 也提供了更优秀的 nio.2 支持,用 java 实现 nio 并不是遥不可及的事情,只是需要我们熟悉 nio 的技术细节。

我们需要将服务部署在分布式环境下的不同节点上,通过服务注册的方式,让客户端来自动发现当前可用的服务,并调用这些服务。这需要一种服务注册表(service registry)的组件,让它来注册分布式环境下所有的服务地址(包括:主机名与端口号)。

应用、服务、服务注册表之间的关系见下图:

Java利用Sping框架编写RPC远程过程调用服务的教程

每台 server 上可发布多个 service,这些 service 共用一个 host 与 port,在分布式环境下会提供 server 共同对外提供 service。此外,为防止 service registry 出现单点故障,因此需要将其搭建为集群环境。

本文将为您揭晓开发轻量级分布式 rpc 框架的具体过程,该框架基于 tcp 协议,提供了 nio 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。

根据以上技术需求,我们可使用如下技术选型:

  • spring:它是最强大的依赖注入框架,也是业界的权威标准。
  • netty:它使 nio 编程更加容易,屏蔽了 java 底层的 nio 细节。
  • protostuff:它基于 protobuf 序列化框架,面向 pojo,无需编写 .proto 文件。
  • zookeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

相关 maven 依赖请见最后附录。

第一步:编写服务接口

public interface helloservice {

  string hello(string name);
}

将该接口放在独立的客户端 jar 包中,以供应用使用。

第二步:编写服务接口的实现类

@rpcservice(helloservice.class) // 指定远程接口
public class helloserviceimpl implements helloservice {

  @override
  public string hello(string name) {
    return "hello! " + name;
  }
}

使用rpcservice注解定义在服务接口的实现类上,需要对该实现类指定远程接口,因为实现类可能会实现多个接口,一定要告诉框架哪个才是远程接口。

rpcservice代码如下:

@target({elementtype.type})
@retention(retentionpolicy.runtime)
@component // 表明可被 spring 扫描
public @interface rpcservice {

  class<?> value();
}

该注解具备 spring 的component注解的特性,可被 spring 扫描。

该实现类放在服务端 jar 包中,该 jar 包还提供了一些服务端的配置文件与启动服务的引导程序。

第三步:配置服务端

服务端 spring 配置文件名为spring.xml,内容如下:

<beans ...>
  <context:component-scan base-package="com.xxx.rpc.sample.server"/>

  <context:property-placeholder location="classpath:config.properties"/>

  <!-- 配置服务注册组件 -->
  <bean id="serviceregistry" class="com.xxx.rpc.registry.serviceregistry">
    <constructor-arg name="registryaddress" value="${registry.address}"/>
  </bean>

  <!-- 配置 rpc 服务器 -->
  <bean id="rpcserver" class="com.xxx.rpc.server.rpcserver">
    <constructor-arg name="serveraddress" value="${server.address}"/>
    <constructor-arg name="serviceregistry" ref="serviceregistry"/>
  </bean>
</beans>

具体的配置参数在config.properties文件中,内容如下:

# zookeeper 服务器
registry.address=127.0.0.1:2181

# rpc 服务器
server.address=127.0.0.1:8000

以上配置表明:连接本地的 zookeeper 服务器,并在 8000 端口上发布 rpc 服务。

第四步:启动服务器并发布服务

为了加载 spring 配置文件来发布服务,只需编写一个引导程序即可:

public class rpcbootstrap {

  public static void main(string[] args) {
    new classpathxmlapplicationcontext("spring.xml");
  }
}

运行rpcbootstrap类的main方法即可启动服务端,但还有两个重要的组件尚未实现,它们分别是:serviceregistry与rpcserver,下文会给出具体实现细节。

第五步:实现服务注册

使用 zookeeper 客户端可轻松实现服务注册功能,serviceregistry代码如下:

public class serviceregistry {

  private static final logger logger = loggerfactory.getlogger(serviceregistry.class);

  private countdownlatch latch = new countdownlatch(1);

  private string registryaddress;

  public serviceregistry(string registryaddress) {
    this.registryaddress = registryaddress;
  }

  public void register(string data) {
    if (data != null) {
      zookeeper zk = connectserver();
      if (zk != null) {
        createnode(zk, data);
      }
    }
  }

  private zookeeper connectserver() {
    zookeeper zk = null;
    try {
      zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.getstate() == event.keeperstate.syncconnected) {
            latch.countdown();
          }
        }
      });
      latch.await();
    } catch (ioexception | interruptedexception e) {
      logger.error("", e);
    }
    return zk;
  }

  private void createnode(zookeeper zk, string data) {
    try {
      byte[] bytes = data.getbytes();
      string path = zk.create(constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential);
      logger.debug("create zookeeper node ({} => {})", path, data);
    } catch (keeperexception | interruptedexception e) {
      logger.error("", e);
    }
  }
}

其中,通过constant配置了所有的常量:

public interface constant {

  int zk_session_timeout = 5000;

  string zk_registry_path = "/registry";
  string zk_data_path = zk_registry_path + "/data";
}

注意:首先需要使用 zookeeper 客户端命令行创建/registry永久节点,用于存放所有的服务临时节点。

第六步:实现 rpc 服务器

使用 netty 可实现一个支持 nio 的 rpc 服务器,需要使用serviceregistry注册服务地址,rpcserver代码如下:

public class rpcserver implements applicationcontextaware, initializingbean {

  private static final logger logger = loggerfactory.getlogger(rpcserver.class);

  private string serveraddress;
  private serviceregistry serviceregistry;

  private map<string, object> handlermap = new hashmap<>(); // 存放接口名与服务对象之间的映射关系

  public rpcserver(string serveraddress) {
    this.serveraddress = serveraddress;
  }

  public rpcserver(string serveraddress, serviceregistry serviceregistry) {
    this.serveraddress = serveraddress;
    this.serviceregistry = serviceregistry;
  }

  @override
  public void setapplicationcontext(applicationcontext ctx) throws beansexception {
    map<string, object> servicebeanmap = ctx.getbeanswithannotation(rpcservice.class); // 获取所有带有 rpcservice 注解的 spring bean
    if (maputils.isnotempty(servicebeanmap)) {
      for (object servicebean : servicebeanmap.values()) {
        string interfacename = servicebean.getclass().getannotation(rpcservice.class).value().getname();
        handlermap.put(interfacename, servicebean);
      }
    }
  }

  @override
  public void afterpropertiesset() throws exception {
    eventloopgroup bossgroup = new nioeventloopgroup();
    eventloopgroup workergroup = new nioeventloopgroup();
    try {
      serverbootstrap bootstrap = new serverbootstrap();
      bootstrap.group(bossgroup, workergroup).channel(nioserversocketchannel.class)
        .childhandler(new channelinitializer<socketchannel>() {
          @override
          public void initchannel(socketchannel channel) throws exception {
            channel.pipeline()
              .addlast(new rpcdecoder(rpcrequest.class)) // 将 rpc 请求进行解码(为了处理请求)
              .addlast(new rpcencoder(rpcresponse.class)) // 将 rpc 响应进行编码(为了返回响应)
              .addlast(new rpchandler(handlermap)); // 处理 rpc 请求
          }
        })
        .option(channeloption.so_backlog, 128)
        .childoption(channeloption.so_keepalive, true);

      string[] array = serveraddress.split(":");
      string host = array[0];
      int port = integer.parseint(array[1]);

      channelfuture future = bootstrap.bind(host, port).sync();
      logger.debug("server started on port {}", port);

      if (serviceregistry != null) {
        serviceregistry.register(serveraddress); // 注册服务地址
      }

      future.channel().closefuture().sync();
    } finally {
      workergroup.shutdowngracefully();
      bossgroup.shutdowngracefully();
    }
  }
}

以上代码中,有两个重要的 pojo 需要描述一下,它们分别是rpcrequest与rpcresponse。

使用rpcrequest封装 rpc 请求,代码如下:

public class rpcrequest {

  private string requestid;
  private string classname;
  private string methodname;
  private class<?>[] parametertypes;
  private object[] parameters;

  // getter/setter...
}

使用rpcresponse封装 rpc 响应,代码如下:

public class rpcresponse {

  private string requestid;
  private throwable error;
  private object result;

  // getter/setter...
}

使用rpcdecoder提供 rpc 解码,只需扩展 netty 的bytetomessagedecoder抽象类的decode方法即可,代码如下:

public class rpcdecoder extends bytetomessagedecoder {

  private class<?> genericclass;

  public rpcdecoder(class<?> genericclass) {
    this.genericclass = genericclass;
  }

  @override
  public void decode(channelhandlercontext ctx, bytebuf in, list<object> out) throws exception {
    if (in.readablebytes() < 4) {
      return;
    }
    in.markreaderindex();
    int datalength = in.readint();
    if (datalength < 0) {
      ctx.close();
    }
    if (in.readablebytes() < datalength) {
      in.resetreaderindex();
      return;
    }
    byte[] data = new byte[datalength];
    in.readbytes(data);

    object obj = serializationutil.deserialize(data, genericclass);
    out.add(obj);
  }
}

使用rpcencoder提供 rpc 编码,只需扩展 netty 的messagetobyteencoder抽象类的encode方法即可,代码如下:

public class rpcencoder extends messagetobyteencoder {

  private class<?> genericclass;

  public rpcencoder(class<?> genericclass) {
    this.genericclass = genericclass;
  }

  @override
  public void encode(channelhandlercontext ctx, object in, bytebuf out) throws exception {
    if (genericclass.isinstance(in)) {
      byte[] data = serializationutil.serialize(in);
      out.writeint(data.length);
      out.writebytes(data);
    }
  }
}

编写一个serializationutil工具类,使用protostuff实现序列化:

public class serializationutil {

  private static map<class<?>, schema<?>> cachedschema = new concurrenthashmap<>();

  private static objenesis objenesis = new objenesisstd(true);

  private serializationutil() {
  }

  @suppresswarnings("unchecked")
  private static <t> schema<t> getschema(class<t> cls) {
    schema<t> schema = (schema<t>) cachedschema.get(cls);
    if (schema == null) {
      schema = runtimeschema.createfrom(cls);
      if (schema != null) {
        cachedschema.put(cls, schema);
      }
    }
    return schema;
  }

  @suppresswarnings("unchecked")
  public static <t> byte[] serialize(t obj) {
    class<t> cls = (class<t>) obj.getclass();
    linkedbuffer buffer = linkedbuffer.allocate(linkedbuffer.default_buffer_size);
    try {
      schema<t> schema = getschema(cls);
      return protostuffioutil.tobytearray(obj, schema, buffer);
    } catch (exception e) {
      throw new illegalstateexception(e.getmessage(), e);
    } finally {
      buffer.clear();
    }
  }

  public static <t> t deserialize(byte[] data, class<t> cls) {
    try {
      t message = (t) objenesis.newinstance(cls);
      schema<t> schema = getschema(cls);
      protostuffioutil.mergefrom(data, message, schema);
      return message;
    } catch (exception e) {
      throw new illegalstateexception(e.getmessage(), e);
    }
  }
}

以上了使用 objenesis 来实例化对象,它是比 java 反射更加强大。

注意:如需要替换其它序列化框架,只需修改serializationutil即可。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式。

使用rpchandler中处理 rpc 请求,只需扩展 netty 的simplechannelinboundhandler抽象类即可,代码如下:

public class rpchandler extends simplechannelinboundhandler<rpcrequest> {

  private static final logger logger = loggerfactory.getlogger(rpchandler.class);

  private final map<string, object> handlermap;

  public rpchandler(map<string, object> handlermap) {
    this.handlermap = handlermap;
  }

  @override
  public void channelread0(final channelhandlercontext ctx, rpcrequest request) throws exception {
    rpcresponse response = new rpcresponse();
    response.setrequestid(request.getrequestid());
    try {
      object result = handle(request);
      response.setresult(result);
    } catch (throwable t) {
      response.seterror(t);
    }
    ctx.writeandflush(response).addlistener(channelfuturelistener.close);
  }

  private object handle(rpcrequest request) throws throwable {
    string classname = request.getclassname();
    object servicebean = handlermap.get(classname);

    class<?> serviceclass = servicebean.getclass();
    string methodname = request.getmethodname();
    class<?>[] parametertypes = request.getparametertypes();
    object[] parameters = request.getparameters();

    /*method method = serviceclass.getmethod(methodname, parametertypes);
    method.setaccessible(true);
    return method.invoke(servicebean, parameters);*/

    fastclass servicefastclass = fastclass.create(serviceclass);
    fastmethod servicefastmethod = servicefastclass.getmethod(methodname, parametertypes);
    return servicefastmethod.invoke(servicebean, parameters);
  }

  @override
  public void exceptioncaught(channelhandlercontext ctx, throwable cause) {
    logger.error("server caught exception", cause);
    ctx.close();
  }
}

为了避免使用 java 反射带来的性能问题,我们可以使用 cglib 提供的反射 api,如上面用到的fastclass与fastmethod。

第七步:配置客户端

同样使用 spring 配置文件来配置 rpc 客户端,spring.xml代码如下:

<beans ...>
  <context:property-placeholder location="classpath:config.properties"/>

  <!-- 配置服务发现组件 -->
  <bean id="servicediscovery" class="com.xxx.rpc.registry.servicediscovery">
    <constructor-arg name="registryaddress" value="${registry.address}"/>
  </bean>

  <!-- 配置 rpc 代理 -->
  <bean id="rpcproxy" class="com.xxx.rpc.client.rpcproxy">
    <constructor-arg name="servicediscovery" ref="servicediscovery"/>
  </bean>
</beans>

其中config.properties提供了具体的配置:

# zookeeper 服务器
registry.address=127.0.0.1:2181

第八步:实现服务发现

同样使用 zookeeper 实现服务发现功能,见如下代码:

public class servicediscovery {

  private static final logger logger = loggerfactory.getlogger(servicediscovery.class);

  private countdownlatch latch = new countdownlatch(1);

  private volatile list<string> datalist = new arraylist<>();

  private string registryaddress;

  public servicediscovery(string registryaddress) {
    this.registryaddress = registryaddress;

    zookeeper zk = connectserver();
    if (zk != null) {
      watchnode(zk);
    }
  }

  public string discover() {
    string data = null;
    int size = datalist.size();
    if (size > 0) {
      if (size == 1) {
        data = datalist.get(0);
        logger.debug("using only data: {}", data);
      } else {
        data = datalist.get(threadlocalrandom.current().nextint(size));
        logger.debug("using random data: {}", data);
      }
    }
    return data;
  }

  private zookeeper connectserver() {
    zookeeper zk = null;
    try {
      zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.getstate() == event.keeperstate.syncconnected) {
            latch.countdown();
          }
        }
      });
      latch.await();
    } catch (ioexception | interruptedexception e) {
      logger.error("", e);
    }
    return zk;
  }

  private void watchnode(final zookeeper zk) {
    try {
      list<string> nodelist = zk.getchildren(constant.zk_registry_path, new watcher() {
        @override
        public void process(watchedevent event) {
          if (event.gettype() == event.eventtype.nodechildrenchanged) {
            watchnode(zk);
          }
        }
      });
      list<string> datalist = new arraylist<>();
      for (string node : nodelist) {
        byte[] bytes = zk.getdata(constant.zk_registry_path + "/" + node, false, null);
        datalist.add(new string(bytes));
      }
      logger.debug("node data: {}", datalist);
      this.datalist = datalist;
    } catch (keeperexception | interruptedexception e) {
      logger.error("", e);
    }
  }
}

第九步:实现 rpc 代理

这里使用 java 提供的动态代理技术实现 rpc 代理(当然也可以使用 cglib 来实现),具体代码如下:

public class rpcproxy {

  private string serveraddress;
  private servicediscovery servicediscovery;

  public rpcproxy(string serveraddress) {
    this.serveraddress = serveraddress;
  }

  public rpcproxy(servicediscovery servicediscovery) {
    this.servicediscovery = servicediscovery;
  }

  @suppresswarnings("unchecked")
  public <t> t create(class<?> interfaceclass) {
    return (t) proxy.newproxyinstance(
      interfaceclass.getclassloader(),
      new class<?>[]{interfaceclass},
      new invocationhandler() {
        @override
        public object invoke(object proxy, method method, object[] args) throws throwable {
          rpcrequest request = new rpcrequest(); // 创建并初始化 rpc 请求
          request.setrequestid(uuid.randomuuid().tostring());
          request.setclassname(method.getdeclaringclass().getname());
          request.setmethodname(method.getname());
          request.setparametertypes(method.getparametertypes());
          request.setparameters(args);

          if (servicediscovery != null) {
            serveraddress = servicediscovery.discover(); // 发现服务
          }

          string[] array = serveraddress.split(":");
          string host = array[0];
          int port = integer.parseint(array[1]);

          rpcclient client = new rpcclient(host, port); // 初始化 rpc 客户端
          rpcresponse response = client.send(request); // 通过 rpc 客户端发送 rpc 请求并获取 rpc 响应

          if (response.iserror()) {
            throw response.geterror();
          } else {
            return response.getresult();
          }
        }
      }
    );
  }
}

使用rpcclient类实现 rpc 客户端,只需扩展 netty 提供的simplechannelinboundhandler抽象类即可,代码如下:

public class rpcclient extends simplechannelinboundhandler<rpcresponse> {

  private static final logger logger = loggerfactory.getlogger(rpcclient.class);

  private string host;
  private int port;

  private rpcresponse response;

  private final object obj = new object();

  public rpcclient(string host, int port) {
    this.host = host;
    this.port = port;
  }

  @override
  public void channelread0(channelhandlercontext ctx, rpcresponse response) throws exception {
    this.response = response;

    synchronized (obj) {
      obj.notifyall(); // 收到响应,唤醒线程
    }
  }

  @override
  public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception {
    logger.error("client caught exception", cause);
    ctx.close();
  }

  public rpcresponse send(rpcrequest request) throws exception {
    eventloopgroup group = new nioeventloopgroup();
    try {
      bootstrap bootstrap = new bootstrap();
      bootstrap.group(group).channel(niosocketchannel.class)
        .handler(new channelinitializer<socketchannel>() {
          @override
          public void initchannel(socketchannel channel) throws exception {
            channel.pipeline()
              .addlast(new rpcencoder(rpcrequest.class)) // 将 rpc 请求进行编码(为了发送请求)
              .addlast(new rpcdecoder(rpcresponse.class)) // 将 rpc 响应进行解码(为了处理响应)
              .addlast(rpcclient.this); // 使用 rpcclient 发送 rpc 请求
          }
        })
        .option(channeloption.so_keepalive, true);

      channelfuture future = bootstrap.connect(host, port).sync();
      future.channel().writeandflush(request).sync();

      synchronized (obj) {
        obj.wait(); // 未收到响应,使线程等待
      }

      if (response != null) {
        future.channel().closefuture().sync();
      }
      return response;
    } finally {
      group.shutdowngracefully();
    }
  }
}

第十步:发送 rpc 请求

使用 junit 结合 spring 编写一个单元测试,代码如下:

@runwith(springjunit4classrunner.class)
@contextconfiguration(locations = "classpath:spring.xml")
public class helloservicetest {

  @autowired
  private rpcproxy rpcproxy;

  @test
  public void hellotest() {
    helloservice helloservice = rpcproxy.create(helloservice.class);
    string result = helloservice.hello("world");
    assert.assertequals("hello! world", result);
  }
}

运行以上单元测试,如果不出意外的话,您应该会看到绿条。

总结

本文通过 spring + netty + protostuff + zookeeper 实现了一个轻量级 rpc 框架,使用 spring 提供依赖注入与参数配置,使用 netty 实现 nio 方式的数据传输,使用 protostuff 实现对象序列化,使用 zookeeper 实现服务注册与发现。使用该框架,可将服务部署到分布式环境中的任意节点上,客户端通过远程接口来调用服务端的具体实现,让服务端与客户端的开发完全分离,为实现大规模分布式应用提供了基础支持。

附录:maven 依赖

<!-- junit -->
<dependency>
  <groupid>junit</groupid>
  <artifactid>junit</artifactid>
  <version>4.11</version>
  <scope>test</scope>
</dependency>

<!-- slf4j -->
<dependency>
  <groupid>org.slf4j</groupid>
  <artifactid>slf4j-log4j12</artifactid>
  <version>1.7.7</version>
</dependency>

<!-- spring -->
<dependency>
  <groupid>org.springframework</groupid>
  <artifactid>spring-context</artifactid>
  <version>3.2.12.release</version>
</dependency>
<dependency>
  <groupid>org.springframework</groupid>
  <artifactid>spring-test</artifactid>
  <version>3.2.12.release</version>
  <scope>test</scope>
</dependency>

<!-- netty -->
<dependency>
  <groupid>io.netty</groupid>
  <artifactid>netty-all</artifactid>
  <version>4.0.24.final</version>
</dependency>

<!-- protostuff -->
<dependency>
  <groupid>com.dyuproject.protostuff</groupid>
  <artifactid>protostuff-core</artifactid>
  <version>1.0.8</version>
</dependency>
<dependency>
  <groupid>com.dyuproject.protostuff</groupid>
  <artifactid>protostuff-runtime</artifactid>
  <version>1.0.8</version>
</dependency>

<!-- zookeeper -->
<dependency>
  <groupid>org.apache.zookeeper</groupid>
  <artifactid>zookeeper</artifactid>
  <version>3.4.6</version>
</dependency>

<!-- apache commons collections -->
<dependency>
  <groupid>org.apache.commons</groupid>
  <artifactid>commons-collections4</artifactid>
  <version>4.0</version>
</dependency>

<!-- objenesis -->
<dependency>
  <groupid>org.objenesis</groupid>
  <artifactid>objenesis</artifactid>
  <version>2.1</version>
</dependency>

<!-- cglib -->
<dependency>
  <groupid>cglib</groupid>
  <artifactid>cglib</artifactid>
  <version>3.1</version>
</dependency>