Introduction
"gRPC is a modern open source high performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It is also applicable in last mile of distributed computing to connect devices, mobile applications and browsers to backend services." https://grpc.io/
4 Types of API in gRPC
https://medium.com/@yangli907/grpc-learning-part-1-cdcf59e52707
API protobuf
src/main/proto/example_grpc/hello_service.proto
syntax = "proto3";
// "Package name should be in lowercase, and should correspond
// to the directory hierarchy. e.g., if a file is in my/package/,
// then the package name should be my.package."
// [https://developers.google.com/protocol-buffers/docs/style]
package example_grpc;
service HelloService {
// Unary RPC
rpc Hello(HelloRequest) returns (HelloResponse) {};
// Server Streaming RPC
rpc HelloServerStreaming(HelloServerStreamingRequest) returns (stream HelloServerStreamingResponse) {};
// Client Streaming RPC
rpc HelloClientStreaming(stream HelloClientStreamingRequest) returns (HelloClientStreamingResponse) {};
// Bidirectional Streaming RPC
rpc HelloBiDirectionalStreaming(stream HelloBiDirectionalStreamingRequest) returns (stream HelloBiDirectionalStreamingResponse) {};
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
message HelloServerStreamingRequest {
string name = 1;
}
message HelloServerStreamingResponse {
string message = 1;
}
message HelloClientStreamingRequest {
string name = 1;
}
message HelloClientStreamingResponse {
string message = 1;
}
message HelloBiDirectionalStreamingRequest {
string name = 1;
}
message HelloBiDirectionalStreamingResponse {
string message = 1;
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>se.magnuskkarlsson</groupId>
<artifactId>example-grpc</artifactId>
<version>1.0.0-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.outputEncoding>UTF-8</project.build.outputEncoding>
<failOnMissingWebXml>false</failOnMissingWebXml>
</properties>
<dependencies>
<!-- https://github.com/grpc/grpc-java Chapter: Download -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.36.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.36.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.36.0</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>
<!-- Test Support -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<!-- https://github.com/grpc/grpc-java Chapter: Generated Code -->
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<finalName>${project.artifactId}</finalName>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>11</release>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- https://github.com/grpc/grpc-java Chapter: Generated Code -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version>
<configuration> <protocExecutable>/usr/bin/protoc</protocExecutable> </configuration> <executions> <execution> <goals> <goal>compile</goal>
</goals> </execution> </executions> </plugin> -->
</plugins>
</build>
</project>
gRPC Server
A typical gRPC Server is created and started as a standalone server.
Below is the boilerplate code for a standalone server.
package se.magnuskkarlsson.example_grpc.server;
import java.io.IOException;
import io.grpc.Server;
import io.grpc.ServerBuilder;
public class HelloServer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("Starting Server ...");
final Server server = ServerBuilder //
.forPort(50051) //
.addService(new HelloServiceImpl()) //
.build();
server.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Shutting down Server ....");
server.shutdown();
System.out.println("Successfully stopped Server");
}));
server.awaitTermination();
}
}
gRPC Client
Below is the boilerplate code for a standalone client.
package se.magnuskkarlsson.example_grpc.client;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
// https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java
public class HelloClient {
public static void main(String[] args) throws Exception {
System.out.println("Starting Client ...");
ManagedChannel channel = ManagedChannelBuilder //
.forAddress("localhost", 50051) //
.usePlaintext() //
.build();
HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
System.out.println("Shutting down Client");
channel.shutdown();
}
Unary RPC
Unary RPC is the most common call, it is the classical Request and Response pattern.
Server
package se.magnuskkarlsson.example_grpc.server;
import java.util.ArrayList;
import java.util.List;
import example_grpc.HelloServiceGrpc.HelloServiceImplBase;
import example_grpc.HelloServiceOuterClass.HelloRequest;
import example_grpc.HelloServiceOuterClass.HelloResponse;
import io.grpc.stub.StreamObserver;
public class HelloServiceImpl extends HelloServiceImplBase {
// Unary RPC
@Override
public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
String name = request.getName();
System.out.println("[Unary] Hello name=" + name + " ...");
HelloResponse resp = HelloResponse.newBuilder() //
.setMessage("Hello from Async " + name) //
.build();
// send the response
responseObserver.onNext(resp);
responseObserver.onCompleted();
}
}
Client - Blocking Client; HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
package se.magnuskkarlsson.example_grpc.client;
import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import example_grpc.HelloServiceGrpc;
import example_grpc.HelloServiceGrpc.HelloServiceBlockingStub;
import example_grpc.HelloServiceGrpc.HelloServiceStub;
import example_grpc.HelloServiceOuterClass.HelloRequest;
import example_grpc.HelloServiceOuterClass.HelloResponse;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
// https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java
public class HelloClient {
public static void main(String[] args) throws Exception {
System.out.println("Starting Client ...");
ManagedChannel channel = ManagedChannelBuilder //
.forAddress("localhost", 50051) //
.usePlaintext() //
.build();
new HelloClient().unary(channel);
System.out.println("Shutting down Client");
channel.shutdown();
}
private void unary(ManagedChannel channel) {
HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
// HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);
HelloRequest req1 = HelloRequest.newBuilder().setName("Magnus").build();
HelloResponse resp1 = syncClient.hello(req1);
System.out.println("[Async] " + resp1);
HelloRequest req2 = HelloRequest.newBuilder().setName("John Doe").build();
HelloResponse resp2 = syncClient.hello(req2);
System.out.println("[Async] " + resp2);
}
}
Server Streaming RPC
Streaming Server is well suited for when Server needs to PUSH data to client. This is especially true for Big Data.
Server
// Server Streaming RPC
@Override
public void helloServerStreaming(HelloServerStreamingRequest request,
StreamObserver<HelloServerStreamingResponse> responseObserver) {
String name = request.getName();
System.out.println("[Server Streaming] Hello name=" + name + " ...");
try {
for (int i = 0; i < 10; ++i) {
HelloServerStreamingResponse resp = HelloServerStreamingResponse.newBuilder() //
.setMessage("Hello from Server Streaming " + name + " " + i) //
.build();
responseObserver.onNext(resp);
Thread.sleep(1000L);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
responseObserver.onCompleted();
}
}
Blocking Client
private void serverStreaming(ManagedChannel channel) {
HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
// HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);
HelloServerStreamingRequest req1 = HelloServerStreamingRequest.newBuilder().setName("Magnus").build();
Iterator<HelloServerStreamingResponse> respIter = syncClient.helloServerStreaming(req1);
respIter.forEachRemaining(resp -> {
System.out.println("[Server Streaming]" + resp.getMessage());
});
}
Client Streaming RPC
Client Streaming is suited when the client wants to send Big Data. Or when server processing is possible to do in parallell, then can client split jobs.
Server
// Client Streaming RPC
@Override
public StreamObserver<HelloClientStreamingRequest> helloClientStreaming(
StreamObserver<HelloClientStreamingResponse> responseObserver) {
StreamObserver<HelloClientStreamingRequest> requestObserver = new StreamObserver<HelloClientStreamingRequest>() {
private List<String> names = new ArrayList<String>();
@Override
public void onNext(HelloClientStreamingRequest request) {
// client sends a message
String name = request.getName();
names.add(name);
}
@Override
public void onError(Throwable t) {
// client sends a error
}
@Override
public void onCompleted() {
// client is done
HelloClientStreamingResponse response = HelloClientStreamingResponse.newBuilder() //
.setMessage("[Client Streaming] Hello " + names.toString()) //
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
return requestObserver;
}
Asynch Client
private void clientStreaming(ManagedChannel channel) throws InterruptedException {
HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<HelloClientStreamingResponse> responseObserver = new StreamObserver<HelloClientStreamingResponse>() {
@Override
public void onNext(HelloClientStreamingResponse response) {
// we get a response from the server
String message = response.getMessage();
System.out.println("[Client Streaming] Recieved message from server " + message);
// onNext will be called only once
}
@Override
public void onError(Throwable t) {
// we get an error from server
latch.countDown();
}
@Override
public void onCompleted() {
// the server is done sending us data
// onComplete will be called right after onNext()
System.out.println("[Client Streaming] Server has completed sending us data/stream");
latch.countDown();
}
};
StreamObserver<HelloClientStreamingRequest> requestObserver = asyncClient
.helloClientStreaming(responseObserver);
HelloClientStreamingRequest request1 = HelloClientStreamingRequest.newBuilder() //
.setName("John") //
.build();
requestObserver.onNext(request1);
HelloClientStreamingRequest request2 = HelloClientStreamingRequest.newBuilder() //
.setName("Nisse") //
.build();
requestObserver.onNext(request2);
HelloClientStreamingRequest request3 = HelloClientStreamingRequest.newBuilder() //
.setName("Klara") //
.build();
requestObserver.onNext(request3);
// tell the server, the client is done sending data/streaming
requestObserver.onCompleted();
latch.await(5, TimeUnit.SECONDS);
}
Bidirectional Streaming RPC
Bidirectional Streaming is like a chat protocol and have typical a long running connection.
Server
// Bidirectional streaming RPC
@Override
public StreamObserver<HelloBiDirectionalStreamingRequest> helloBiDirectionalStreaming(
StreamObserver<HelloBiDirectionalStreamingResponse> responseObserver) {
StreamObserver<HelloBiDirectionalStreamingRequest> requestObserver = new StreamObserver<HelloBiDirectionalStreamingRequest>() {
@Override
public void onNext(HelloBiDirectionalStreamingRequest request) {
// client sends a message
String name = request.getName();
HelloBiDirectionalStreamingResponse response = HelloBiDirectionalStreamingResponse.newBuilder() //
.setMessage("[Bi-Directional] Hello " + name) //
.build();
responseObserver.onNext(response);
}
@Override
public void onError(Throwable t) {
// client sends a error
}
@Override
public void onCompleted() {
// client is done sending data/stream
responseObserver.onCompleted();
}
};
return requestObserver;
}
Asynch Client
private void biDirectionalStreaming(ManagedChannel channel) throws InterruptedException {
HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);
CountDownLatch latch = new CountDownLatch(1);
StreamObserver<HelloBiDirectionalStreamingResponse> responseObserver = new StreamObserver<HelloBiDirectionalStreamingResponse>() {
@Override
public void onNext(HelloBiDirectionalStreamingResponse response) {
// we get a response from the server
String message = response.getMessage();
System.out.println("[Bi-Directional Streaming] Recieved message from server " + message);
}
@Override
public void onError(Throwable t) {
// we get an error from server
latch.countDown();
}
@Override
public void onCompleted() {
// the server is done sending/streaming us data
System.out.println("[Bi-Directional Streaming] Server has completed sending us data/stream");
latch.countDown();
}
};
StreamObserver<HelloBiDirectionalStreamingRequest> requestObserver = asyncClient
.helloBiDirectionalStreaming(responseObserver);
HelloBiDirectionalStreamingRequest request1 = HelloBiDirectionalStreamingRequest.newBuilder() //
.setName("John") //
.build();
requestObserver.onNext(request1);
System.out.println("[Bi-Directional Streaming] Sending request1");
HelloBiDirectionalStreamingRequest request2 = HelloBiDirectionalStreamingRequest.newBuilder() //
.setName("Nisse") //
.build();
requestObserver.onNext(request2);
System.out.println("[Bi-Directional Streaming] Sending request2");
HelloBiDirectionalStreamingRequest request3 = HelloBiDirectionalStreamingRequest.newBuilder() //
.setName("Bertil") //
.build();
requestObserver.onNext(request3);
System.out.println("[Bi-Directional Streaming] Sending request3");
// tell the server, the client is done sending data/streaming
requestObserver.onCompleted();
latch.await(5, TimeUnit.SECONDS);
}
No comments:
Post a Comment