grpc--demo(参考官网说明看demo)
程序员文章站
2024-03-14 14:17:34
...
1.gradle配置文件(官网上有gradle及maven的配置说明)
plugins {
id 'java'
id 'com.google.protobuf' version '0.8.8'
}
group 'com.hgl.dp.io.grpc'
version '1.0-SNAPSHOT'
sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {
maven{
url "http://maven.aliyun.com/nexus/content/groups/public/"
}
mavenCentral()
}
dependencies {
testCompile group: 'junit', name: 'junit', version: '4.12'
compile 'io.grpc:grpc-netty-shaded:1.20.0'
compile 'io.grpc:grpc-protobuf:1.20.0'
compile 'io.grpc:grpc-stub:1.20.0'
}
tasks.withType(JavaCompile) {
options.encoding = "UTF-8"
}
protobuf {
//配置生成的java类的根目录
generatedFilesBaseDir = "$projectDir/src/"
protoc {
artifact = "com.google.protobuf:protoc:3.7.1"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.20.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {
outputSubDir = 'java'
}
}
}
}
2.proto文件
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.hgldp.grpc";
option java_outer_classname = "SimPleProto";
service ExampleGrpc{
rpc sendSimpleMessage(HelloRequest) returns(HelloResponse){}
rpc getListMessage(HelloRequest) returns(stream StudentResponse){}
rpc getStudentList(stream HelloRequest) returns(StudentResponseList){}
rpc BiTalk(stream StreamRequest) returns(stream StreamResponse){}
}
message HelloRequest{
string no = 1;
}
message HelloResponse{
string name=1;
}
message StudentResponse{
string name = 1;
int32 age = 2;
string city = 3;
}
message StudentResponseList{
repeated StudentResponse studentResponse = 1;
}
message StreamRequest{
string request_info = 1;
}
message StreamResponse{
string response_info = 1;
}
3.服务端代码(服务端接口实现类和服务端启动类可以分开,也可以放在一起,看自己喜好)
3.1 自定义服务端实现类
package com.hgldp.server;
import com.hgldp.grpc.*;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
public class GrpcServerImpl extends ExampleGrpcGrpc.ExampleGrpcImplBase{
/**
简单模型
*/
@Override
public void sendSimpleMessage(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
//super.sendSimpleMessage(request, responseObserver);
System.out.println("服务器端收到消息:"+request.getNo());
responseObserver.onNext(HelloResponse.newBuilder().setName("小明").build());
responseObserver.onCompleted();
}
/**
* 客户端发送消息,服务器端返回流
* @param request
* @param responseObserver
*/
@Override
public void getListMessage(HelloRequest request, StreamObserver<StudentResponse> responseObserver) {
// super.getListMessage(request, responseObserver);
System.out.println("服务端接受到消息:"+ request.getNo());
responseObserver.onNext(StudentResponse.newBuilder().setName("张三").setAge(12).setCity("北京").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(13).setCity("上海").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(14).setCity("广州").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("马六").setAge(15).setCity("武汉").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("田七").setAge(16).setCity("深圳").build());
responseObserver.onCompleted();
}
/**
* 客户端发送流,服务器端返回消息
* @param responseObserver
* @return
*/
@Override
public StreamObserver<HelloRequest> getStudentList(StreamObserver<StudentResponseList> responseObserver) {
// return super.getStudentList(responseObserver);
return new StreamObserver<HelloRequest>() {
//接收到客户端的消息就调用一次
@Override
public void onNext(HelloRequest value) {
System.out.println("onNext:"+ value.getNo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
//客户端的消息全部传递完毕后服务器端调用这个方法
@Override
public void onCompleted() {
StudentResponse studentResponse = StudentResponse.newBuilder().setAge(12).setName("张三").setCity("北京").build();
StudentResponse studentResponse1 = StudentResponse.newBuilder().setAge(13).setName("李四").setCity("上海").build();
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
.addStudentResponse(studentResponse1)
.build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
/**
* 客户端发送流,服务端返回流
* @param responseObserver
* @return
*/
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
// return super.biTalk(responseObserver);
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest value) {
System.out.println(value.getRequestInfo());
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
//客户端连接关闭,服务端也要把连接关闭,否则保留一个单向流没有意义
responseObserver.onCompleted();
}
};
}
}
3.2 服务端启动类
package com.hgldp.server;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import java.io.IOException;
public class GrpcCustomServer {
private Server server;
private void start() throws IOException {
int port = 8086;
server = ServerBuilder.forPort(port)
.addService(new GrpcServerImpl())
.build()
.start();
System.out.println("server start....");
//添加jvm钩子
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
System.out.println("jvm将要关闭");
stop();
System.out.println("jvm关闭...");
}
}));
}
private void stop(){
if(server != null){
server.shutdown();
}
}
private void blockServer() throws InterruptedException {
if(server != null){
server.awaitTermination();
}
}
public static void main(String[] args) throws InterruptedException, IOException {
GrpcCustomServer grpcCustomServer = new GrpcCustomServer();
grpcCustomServer.start();
//start方法是异步的,所以这里要把服务器阻塞,不然主程序就结束了
grpcCustomServer.blockServer();
}
}
4.客户端代码实现
package com.hgldp.client;
import com.hgldp.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
public class GrpcCustomClient {
private ManagedChannel channel;
private ExampleGrpcGrpc.ExampleGrpcBlockingStub blockingStub;
private ExampleGrpcGrpc.ExampleGrpcStub asyStub;
private void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
}
public static void main(String[] args) throws InterruptedException {
GrpcCustomClient grpcCustomClient = new GrpcCustomClient();
//创建通道channel
grpcCustomClient.channel = ManagedChannelBuilder.forAddress("127.0.0.1",8086)
.usePlaintext()
.build();
//通过通道获取stub
//第一种情况
grpcCustomClient.blockingStub = ExampleGrpcGrpc.newBlockingStub(grpcCustomClient.channel);
HelloResponse helloResponse = grpcCustomClient.blockingStub.sendSimpleMessage(HelloRequest.newBuilder().setNo("10").build());
System.out.println("客户端收到消息:"+helloResponse.getName());
System.out.println("----------------------------------------------------------");
//第二种情况
Iterator<StudentResponse> listMessage = grpcCustomClient.blockingStub.getListMessage(HelloRequest.newBuilder().setNo("11").build());
while (listMessage.hasNext()){
StudentResponse studentResponse = listMessage.next();
System.out.println(studentResponse.getName()+ ","+ studentResponse.getAge() + ","+ studentResponse.getCity());
}
System.out.println("----------------------------------------------------------");
//第三种情况
StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName());
System.out.println(studentResponse.getAge());
System.out.println(studentResponse.getCity());
System.out.println("*************");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
}
};
//异步的桩
grpcCustomClient.asyStub = ExampleGrpcGrpc.newStub(grpcCustomClient.channel);
StreamObserver<HelloRequest> studentList = grpcCustomClient.asyStub.getStudentList(studentResponseListStreamObserver);
//消息发送是异步的,如果客户端结束过快,可能还没有接收到消息就已经一结束了
studentList.onNext(HelloRequest.newBuilder().setNo("15").build());
studentList.onNext(HelloRequest.newBuilder().setNo("16").build());
studentList.onNext(HelloRequest.newBuilder().setNo("17").build());
studentList.onNext(HelloRequest.newBuilder().setNo("18").build());
studentList.onCompleted();
System.out.println("---------------------------------------------------------------------------");
//第四种情况
StreamObserver<StreamRequest> requestStreamObserver = grpcCustomClient.asyStub.biTalk(new StreamObserver<StreamResponse>() {
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed");
}
});
for(int i = 0;i< 10 ;i++){
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
TimeUnit.SECONDS.sleep(1);
}
//发送完毕通知
requestStreamObserver.onCompleted();
//关闭客户端
grpcCustomClient.shutdown();
}
}