上次简单介绍了grpc的使用方法,并创建了一个方法调用,在grpc中有四种服务类型,下面分别进行介绍
简单rpc
这就是一般的rpc调用,一个请求对象对应一个返回对象
proto语法:
rpc simpleHello(Person) returns (Result) {}service代码
@Override
public void simpleHello(ProtoObj.Person request,io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {//返回结果responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());responseObserver.onCompleted();
}client代码
@Test
public void simple() throws InterruptedException {final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();//定义同步阻塞的stubHelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();//simpleSystem.out.println("---simple rpc---");System.out.println(blockingStub.simpleHello(person).getString());channel.shutdown();
}输出
---simple rpc---
hello, World
服务端流式rpc
一个请求对象,服务端可以传回多个结果对象
proto语法
rpc serverStreamHello(Person) returns (stream Result) {}service代码
@Override
public void serverStreamHello(ProtoObj.Person request,io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {//返回多个结果responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+request.getMyName()).build());responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());responseObserver.onCompleted();
}client代码
@Test
public void serverStream(){final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();//定义同步阻塞的stubHelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();//server sideSystem.out.println("---server stream rpc---");//返回结果是IteratorIterator<ProtoObj.Result> it = blockingStub.serverStreamHello(person);while (it.hasNext()) {System.out.print(it.next());}channel.shutdown();
}输出
---server stream rpc---
string: "hello, World"
string: "hello2, World"
string: "hello3, World"
客户端流式rpc
客户端传入多个请求对象,服务端返回一个响应结果
proto语法
rpc clientStreamHello(stream Person) returns (Result) {}service代码
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> clientStreamHello(final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {//返回observer应对多个请求对象return new StreamObserver<ProtoObj.Person>(){private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();@Overridepublic void onNext(ProtoObj.Person value) {builder.setString(builder.getString() +"," + value.getMyName());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {builder.setString("hello"+builder.getString());responseObserver.onNext(builder.build());responseObserver.onCompleted();}};
}client代码
@Test
public void clientStream() throws InterruptedException {final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();//定义异步的stubHelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();//client sideSystem.out.println("---client stream rpc---");StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {@Overridepublic void onNext(ProtoObj.Result result) {System.out.println("client stream--" + result.getString());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {//关闭channelchannel.shutdown();}};StreamObserver<ProtoObj.Person> clientStreamObserver = asyncStub.clientStreamHello(responseObserver);clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());clientStreamObserver.onCompleted();//由于是异步获得结果,所以sleep一秒Thread.sleep(1000);
}输出
---client stream rpc---
client stream--hello,World,World2
双向流式rpc
结合客户端流式rpc和服务端流式rpc,可以传入多个对象,返回多个响应对象
proto语法
rpc biStreamHello(stream Person) returns (stream Result) {}service代码
@Override
public io.grpc.stub.StreamObserver<ProtoObj.Person> biStreamHello(final io.grpc.stub.StreamObserver<ProtoObj.Result> responseObserver) {//返回observer应对多个请求对象return new StreamObserver<ProtoObj.Person>(){private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();@Overridepublic void onNext(ProtoObj.Person value) {responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello2, "+value.getMyName()).build());responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+value.getMyName()).build());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {responseObserver.onCompleted();}};
}client代码
@Test
public void bidirectStream() throws InterruptedException {final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();//定义异步的stubHelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();//bi streamSystem.out.println("---bidirectional stream rpc---");StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {@Overridepublic void onNext(ProtoObj.Result result) {System.out.println("bidirectional stream--"+result.getString());}@Overridepublic void onError(Throwable t) {}@Overridepublic void onCompleted() {channel.shutdown();}};StreamObserver<ProtoObj.Person> biStreamObserver=asyncStub.biStreamHello(responseObserver);biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());biStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());biStreamObserver.onCompleted();//由于是异步获得结果,所以sleep一秒Thread.sleep(1000);}输出
---bidirectional stream rpc---
bidirectional stream--hello2, World
bidirectional stream--hello3, World
bidirectional stream--hello2, World2
bidirectional stream--hello3, World2
总结
grpc通过使用流式的方式,返回/接受多个实例可以用于类似不定长数组的入参和出参