欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

grpc--demo(参考官网说明看demo)

程序员文章站 2024-03-14 14:17:34
...

1.gradle配置文件(官网上有gradle及maven的配置说明)

plugins {
    id 'java'
    id 'com.google.protobuf' version '0.8.8'
}

group 'com.hgl.dp.io.grpc'
version '1.0-SNAPSHOT'

sourceCompatibility = 1.8
targetCompatibility = 1.8
repositories {

    maven{
        url "http://maven.aliyun.com/nexus/content/groups/public/"
    }
    mavenCentral()
}

dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile 'io.grpc:grpc-netty-shaded:1.20.0'
    compile 'io.grpc:grpc-protobuf:1.20.0'
    compile 'io.grpc:grpc-stub:1.20.0'

}


tasks.withType(JavaCompile) {
    options.encoding = "UTF-8"
}

protobuf {
//配置生成的java类的根目录
    generatedFilesBaseDir = "$projectDir/src/"
    protoc {
        artifact = "com.google.protobuf:protoc:3.7.1"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.20.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {

                outputSubDir = 'java'
            }
        }
    }
}

2.proto文件

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.hgldp.grpc";
option java_outer_classname = "SimPleProto";


service ExampleGrpc{
    rpc sendSimpleMessage(HelloRequest) returns(HelloResponse){}
    rpc getListMessage(HelloRequest) returns(stream StudentResponse){}

    rpc getStudentList(stream HelloRequest) returns(StudentResponseList){}

    rpc BiTalk(stream StreamRequest) returns(stream StreamResponse){}
}


message HelloRequest{

    string no = 1;
}

message HelloResponse{
    string name=1;
}


message StudentResponse{

    string name = 1;
    int32 age = 2;
    string city = 3;
}

message StudentResponseList{
    repeated StudentResponse studentResponse = 1;
}


message StreamRequest{
    string request_info = 1;
}

message StreamResponse{
    string response_info = 1;
}

3.服务端代码(服务端接口实现类和服务端启动类可以分开,也可以放在一起,看自己喜好)
3.1 自定义服务端实现类

package com.hgldp.server;

import com.hgldp.grpc.*;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

public class GrpcServerImpl extends ExampleGrpcGrpc.ExampleGrpcImplBase{


    /**
       简单模型
     */
    @Override
    public void sendSimpleMessage(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        //super.sendSimpleMessage(request, responseObserver);
        System.out.println("服务器端收到消息:"+request.getNo());
        responseObserver.onNext(HelloResponse.newBuilder().setName("小明").build());
        responseObserver.onCompleted();
    }

    /**
     * 客户端发送消息,服务器端返回流
     * @param request
     * @param responseObserver
     */
    @Override
    public void getListMessage(HelloRequest request, StreamObserver<StudentResponse> responseObserver) {
//        super.getListMessage(request, responseObserver);
        System.out.println("服务端接受到消息:"+ request.getNo());
        responseObserver.onNext(StudentResponse.newBuilder().setName("张三").setAge(12).setCity("北京").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(13).setCity("上海").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(14).setCity("广州").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("马六").setAge(15).setCity("武汉").build());
        responseObserver.onNext(StudentResponse.newBuilder().setName("田七").setAge(16).setCity("深圳").build());

        responseObserver.onCompleted();
    }

    /**
     * 客户端发送流,服务器端返回消息
     * @param responseObserver
     * @return
     */
    @Override
    public StreamObserver<HelloRequest> getStudentList(StreamObserver<StudentResponseList> responseObserver) {
//        return super.getStudentList(responseObserver);
        return new StreamObserver<HelloRequest>() {

            //接收到客户端的消息就调用一次
            @Override
            public void onNext(HelloRequest value) {
                System.out.println("onNext:"+ value.getNo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());

            }

            //客户端的消息全部传递完毕后服务器端调用这个方法
            @Override
            public void onCompleted() {

                StudentResponse studentResponse = StudentResponse.newBuilder().setAge(12).setName("张三").setCity("北京").build();
                StudentResponse studentResponse1 = StudentResponse.newBuilder().setAge(13).setName("李四").setCity("上海").build();

                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse)
                        .addStudentResponse(studentResponse1)
                        .build();
                responseObserver.onNext(studentResponseList);

                responseObserver.onCompleted();

            }
        };
    }

    /**
     * 客户端发送流,服务端返回流
     * @param responseObserver
     * @return
     */
    @Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
//        return super.biTalk(responseObserver);

        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest value) {
                System.out.println(value.getRequestInfo());
                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {

                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {

                //客户端连接关闭,服务端也要把连接关闭,否则保留一个单向流没有意义
                responseObserver.onCompleted();
            }
        };
    }
}

3.2 服务端启动类

package com.hgldp.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;

import java.io.IOException;

public class GrpcCustomServer {

    private Server server;

    private void start() throws IOException {
        int port = 8086;
        server = ServerBuilder.forPort(port)
                .addService(new GrpcServerImpl())
                .build()
                .start();
        System.out.println("server start....");

        //添加jvm钩子
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("jvm将要关闭");
                stop();
                System.out.println("jvm关闭...");
            }
        }));


    }

    private void stop(){
        if(server != null){
            server.shutdown();
        }
    }

    private void blockServer() throws InterruptedException {
        if(server != null){
            server.awaitTermination();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        GrpcCustomServer grpcCustomServer = new GrpcCustomServer();
        grpcCustomServer.start();
        //start方法是异步的,所以这里要把服务器阻塞,不然主程序就结束了
        grpcCustomServer.blockServer();

    }
}

4.客户端代码实现

package com.hgldp.client;

import com.hgldp.grpc.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;

public class GrpcCustomClient {

    private  ManagedChannel channel;

    private ExampleGrpcGrpc.ExampleGrpcBlockingStub blockingStub;

    private ExampleGrpcGrpc.ExampleGrpcStub asyStub;


    private void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException {
        GrpcCustomClient grpcCustomClient = new GrpcCustomClient();

        //创建通道channel
        grpcCustomClient.channel = ManagedChannelBuilder.forAddress("127.0.0.1",8086)
                .usePlaintext()
                .build();

        //通过通道获取stub
        //第一种情况
        grpcCustomClient.blockingStub = ExampleGrpcGrpc.newBlockingStub(grpcCustomClient.channel);
        HelloResponse helloResponse = grpcCustomClient.blockingStub.sendSimpleMessage(HelloRequest.newBuilder().setNo("10").build());
        System.out.println("客户端收到消息:"+helloResponse.getName());
        System.out.println("----------------------------------------------------------");
        //第二种情况
        Iterator<StudentResponse> listMessage = grpcCustomClient.blockingStub.getListMessage(HelloRequest.newBuilder().setNo("11").build());

        while (listMessage.hasNext()){
            StudentResponse studentResponse = listMessage.next();
            System.out.println(studentResponse.getName()+ ","+ studentResponse.getAge() + ","+ studentResponse.getCity());
        }
        System.out.println("----------------------------------------------------------");
        //第三种情况
        StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList value) {
                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName());
                    System.out.println(studentResponse.getAge());
                    System.out.println(studentResponse.getCity());
                    System.out.println("*************");
                });
            }


            @Override
            public void onError(Throwable t) {

                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {

                System.out.println("completed");
            }
        };


        //异步的桩
        grpcCustomClient.asyStub = ExampleGrpcGrpc.newStub(grpcCustomClient.channel);
        StreamObserver<HelloRequest> studentList = grpcCustomClient.asyStub.getStudentList(studentResponseListStreamObserver);

        //消息发送是异步的,如果客户端结束过快,可能还没有接收到消息就已经一结束了
        studentList.onNext(HelloRequest.newBuilder().setNo("15").build());
        studentList.onNext(HelloRequest.newBuilder().setNo("16").build());
        studentList.onNext(HelloRequest.newBuilder().setNo("17").build());
        studentList.onNext(HelloRequest.newBuilder().setNo("18").build());

        studentList.onCompleted();

        System.out.println("---------------------------------------------------------------------------");

        //第四种情况
        StreamObserver<StreamRequest> requestStreamObserver = grpcCustomClient.asyStub.biTalk(new StreamObserver<StreamResponse>() {
            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {

                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {

                System.out.println("completed");
            }
        });

        for(int i = 0;i< 10 ;i++){
            requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
            TimeUnit.SECONDS.sleep(1);
        }
        //发送完毕通知
        requestStreamObserver.onCompleted();
        //关闭客户端
        grpcCustomClient.shutdown();


    }

}

相关标签: grpc