March 25, 2021

Overvew gRPC (Remote Procedure Call)

Introduction

"gRPC is a modern open source high performance Remote Procedure Call (RPC) framework that can run in any environment. It can efficiently connect services in and across data centers with pluggable support for load balancing, tracing, health checking and authentication. It is also applicable in last mile of distributed computing to connect devices, mobile applications and browsers to backend services." https://grpc.io/

4 Types of API in gRPC

https://medium.com/@yangli907/grpc-learning-part-1-cdcf59e52707

API protobuf

src/main/proto/example_grpc/hello_service.proto

syntax = "proto3";

// "Package name should be in lowercase, and should correspond 
// to the directory hierarchy. e.g., if a file is in my/package/, 
// then the package name should be my.package."
// [https://developers.google.com/protocol-buffers/docs/style]
package example_grpc;

service HelloService {

    // Unary RPC
    rpc Hello(HelloRequest) returns (HelloResponse) {};

    // Server Streaming RPC
    rpc HelloServerStreaming(HelloServerStreamingRequest) returns (stream HelloServerStreamingResponse) {};

    // Client Streaming RPC
    rpc HelloClientStreaming(stream HelloClientStreamingRequest) returns (HelloClientStreamingResponse) {};

    // Bidirectional Streaming RPC
    rpc HelloBiDirectionalStreaming(stream HelloBiDirectionalStreamingRequest) returns (stream HelloBiDirectionalStreamingResponse) {};

}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

message HelloServerStreamingRequest {
  string name = 1;
}

message HelloServerStreamingResponse {
  string message = 1;
}

message HelloClientStreamingRequest {
  string name = 1;
}

message HelloClientStreamingResponse {
  string message = 1;
}

message HelloBiDirectionalStreamingRequest {
  string name = 1;
}

message HelloBiDirectionalStreamingResponse {
  string message = 1;
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>se.magnuskkarlsson</groupId>
    <artifactId>example-grpc</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.build.outputEncoding>UTF-8</project.build.outputEncoding>
        <failOnMissingWebXml>false</failOnMissingWebXml>
    </properties>

    <dependencies>
        <!-- https://github.com/grpc/grpc-java Chapter: Download -->
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.36.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.36.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.36.0</version>
        </dependency>
        <dependency> <!-- necessary for Java 9+ -->
            <groupId>org.apache.tomcat</groupId>
            <artifactId>annotations-api</artifactId>
            <version>6.0.53</version>
            <scope>provided</scope>
        </dependency>

        <!-- Test Support -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <version>1.10.19</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <!-- https://github.com/grpc/grpc-java Chapter: Generated Code -->
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <finalName>${project.artifactId}</finalName>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <release>11</release>
                        <showDeprecation>true</showDeprecation>
                        <showWarnings>true</showWarnings>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>

            <!-- https://github.com/grpc/grpc-java Chapter: Generated Code -->
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.36.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> 
                <configuration> <protocExecutable>/usr/bin/protoc</protocExecutable> </configuration> <executions> <execution> <goals> <goal>compile</goal> 
                </goals> </execution> </executions> </plugin> -->
        </plugins>
    </build>
</project> 

gRPC Server

A typical gRPC Server is created and started as a standalone server.

Below is the boilerplate code for a standalone server.

package se.magnuskkarlsson.example_grpc.server;

import java.io.IOException;

import io.grpc.Server;
import io.grpc.ServerBuilder;

public class HelloServer {

    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("Starting Server ...");

        final Server server = ServerBuilder //
                .forPort(50051) //
                .addService(new HelloServiceImpl()) //
                .build();

        server.start();

        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            System.out.println("Shutting down Server ....");
            server.shutdown();
            System.out.println("Successfully stopped Server");
        }));

        server.awaitTermination();
    }

}

gRPC Client

Below is the boilerplate code for a standalone client.

package se.magnuskkarlsson.example_grpc.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

// https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java
public class HelloClient {

    public static void main(String[] args) throws Exception {
        System.out.println("Starting Client ...");
        ManagedChannel channel = ManagedChannelBuilder //
                .forAddress("localhost", 50051) //
                .usePlaintext() //
                .build();

        HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);

        System.out.println("Shutting down Client");
        channel.shutdown();
    }

Unary RPC

Unary RPC is the most common call, it is the classical Request and Response pattern.

Server

package se.magnuskkarlsson.example_grpc.server;

import java.util.ArrayList;
import java.util.List;

import example_grpc.HelloServiceGrpc.HelloServiceImplBase;
import example_grpc.HelloServiceOuterClass.HelloRequest;
import example_grpc.HelloServiceOuterClass.HelloResponse;
import io.grpc.stub.StreamObserver;

public class HelloServiceImpl extends HelloServiceImplBase {

    // Unary RPC
    @Override
    public void hello(HelloRequest request, StreamObserver<HelloResponse> responseObserver) {
        String name = request.getName();
        System.out.println("[Unary] Hello name=" + name + " ...");

        HelloResponse resp = HelloResponse.newBuilder() //
                .setMessage("Hello from Async " + name) //
                .build();

        // send the response
        responseObserver.onNext(resp);

        responseObserver.onCompleted();
    }

}

Client - Blocking Client; HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);

package se.magnuskkarlsson.example_grpc.client;

import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import example_grpc.HelloServiceGrpc;
import example_grpc.HelloServiceGrpc.HelloServiceBlockingStub;
import example_grpc.HelloServiceGrpc.HelloServiceStub;
import example_grpc.HelloServiceOuterClass.HelloRequest;
import example_grpc.HelloServiceOuterClass.HelloResponse;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

// https://github.com/grpc/grpc-java/blob/master/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java
public class HelloClient {

    public static void main(String[] args) throws Exception {
        System.out.println("Starting Client ...");
        ManagedChannel channel = ManagedChannelBuilder //
                .forAddress("localhost", 50051) //
                .usePlaintext() //
                .build();

        new HelloClient().unary(channel);

        System.out.println("Shutting down Client");
        channel.shutdown();
    }

    private void unary(ManagedChannel channel) {
        HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
//        HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);

        HelloRequest req1 = HelloRequest.newBuilder().setName("Magnus").build();
        HelloResponse resp1 = syncClient.hello(req1);
        System.out.println("[Async] " + resp1);

        HelloRequest req2 = HelloRequest.newBuilder().setName("John Doe").build();
        HelloResponse resp2 = syncClient.hello(req2);
        System.out.println("[Async] " + resp2);
    }

}

Server Streaming RPC

Streaming Server is well suited for when Server needs to PUSH data to client. This is especially true for Big Data.

Server

    // Server Streaming RPC
    @Override
    public void helloServerStreaming(HelloServerStreamingRequest request,
            StreamObserver<HelloServerStreamingResponse> responseObserver) {

        String name = request.getName();
        System.out.println("[Server Streaming] Hello name=" + name + " ...");

        try {
            for (int i = 0; i < 10; ++i) {

                HelloServerStreamingResponse resp = HelloServerStreamingResponse.newBuilder() //
                        .setMessage("Hello from Server Streaming " + name + " " + i) //
                        .build();
                responseObserver.onNext(resp);

                Thread.sleep(1000L);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            responseObserver.onCompleted();
        }
    }

Blocking Client

    private void serverStreaming(ManagedChannel channel) {
        HelloServiceBlockingStub syncClient = HelloServiceGrpc.newBlockingStub(channel);
//        HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);

        HelloServerStreamingRequest req1 = HelloServerStreamingRequest.newBuilder().setName("Magnus").build();
        Iterator<HelloServerStreamingResponse> respIter = syncClient.helloServerStreaming(req1);
        respIter.forEachRemaining(resp -> {
            System.out.println("[Server Streaming]" + resp.getMessage());
        });
    }

Client Streaming RPC

Client Streaming is suited when the client wants to send Big Data. Or when server processing is possible to do in parallell, then can client split jobs.

Server

    // Client Streaming RPC
    @Override
    public StreamObserver<HelloClientStreamingRequest> helloClientStreaming(
            StreamObserver<HelloClientStreamingResponse> responseObserver) {

        StreamObserver<HelloClientStreamingRequest> requestObserver = new StreamObserver<HelloClientStreamingRequest>() {

            private List<String> names = new ArrayList<String>();

            @Override
            public void onNext(HelloClientStreamingRequest request) {
                // client sends a message
                String name = request.getName();
                names.add(name);
            }

            @Override
            public void onError(Throwable t) {
                // client sends a error
            }

            @Override
            public void onCompleted() {
                // client is done
                HelloClientStreamingResponse response = HelloClientStreamingResponse.newBuilder() //
                        .setMessage("[Client Streaming] Hello " + names.toString()) //
                        .build();
                responseObserver.onNext(response);
                responseObserver.onCompleted();
            }

        };

        return requestObserver;
    }

Asynch Client

    private void clientStreaming(ManagedChannel channel) throws InterruptedException {
        HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);

        CountDownLatch latch = new CountDownLatch(1);

        StreamObserver<HelloClientStreamingResponse> responseObserver = new StreamObserver<HelloClientStreamingResponse>() {

            @Override
            public void onNext(HelloClientStreamingResponse response) {
                // we get a response from the server
                String message = response.getMessage();
                System.out.println("[Client Streaming] Recieved message from server " + message);

                // onNext will be called only once
            }

            @Override
            public void onError(Throwable t) {
                // we get an error from server
                latch.countDown();
            }

            @Override
            public void onCompleted() {
                // the server is done sending us data

                // onComplete will be called right after onNext()
                System.out.println("[Client Streaming] Server has completed sending us data/stream");
                latch.countDown();
            }

        };

        StreamObserver<HelloClientStreamingRequest> requestObserver = asyncClient
                .helloClientStreaming(responseObserver);

        HelloClientStreamingRequest request1 = HelloClientStreamingRequest.newBuilder() //
                .setName("John") //
                .build();

        requestObserver.onNext(request1);

        HelloClientStreamingRequest request2 = HelloClientStreamingRequest.newBuilder() //
                .setName("Nisse") //
                .build();

        requestObserver.onNext(request2);

        HelloClientStreamingRequest request3 = HelloClientStreamingRequest.newBuilder() //
                .setName("Klara") //
                .build();

        requestObserver.onNext(request3);

        // tell the server, the client is done sending data/streaming
        requestObserver.onCompleted();

        latch.await(5, TimeUnit.SECONDS);
    }

Bidirectional Streaming RPC

Bidirectional Streaming is like a chat protocol and have typical a long running connection.

Server

    // Bidirectional streaming RPC
    @Override
    public StreamObserver<HelloBiDirectionalStreamingRequest> helloBiDirectionalStreaming(
            StreamObserver<HelloBiDirectionalStreamingResponse> responseObserver) {

        StreamObserver<HelloBiDirectionalStreamingRequest> requestObserver = new StreamObserver<HelloBiDirectionalStreamingRequest>() {

            @Override
            public void onNext(HelloBiDirectionalStreamingRequest request) {
                // client sends a message
                String name = request.getName();

                HelloBiDirectionalStreamingResponse response = HelloBiDirectionalStreamingResponse.newBuilder() //
                        .setMessage("[Bi-Directional] Hello " + name) //
                        .build();
                responseObserver.onNext(response);
            }

            @Override
            public void onError(Throwable t) {
                // client sends a error
            }

            @Override
            public void onCompleted() {
                // client is done sending data/stream
                responseObserver.onCompleted();
            }

        };

        return requestObserver;
    }

Asynch Client

    private void biDirectionalStreaming(ManagedChannel channel) throws InterruptedException {
        HelloServiceStub asyncClient = HelloServiceGrpc.newStub(channel);

        CountDownLatch latch = new CountDownLatch(1);

        StreamObserver<HelloBiDirectionalStreamingResponse> responseObserver = new StreamObserver<HelloBiDirectionalStreamingResponse>() {

            @Override
            public void onNext(HelloBiDirectionalStreamingResponse response) {
                // we get a response from the server
                String message = response.getMessage();
                System.out.println("[Bi-Directional Streaming] Recieved message from server " + message);
            }

            @Override
            public void onError(Throwable t) {
                // we get an error from server
                latch.countDown();
            }

            @Override
            public void onCompleted() {
                // the server is done sending/streaming us data
                System.out.println("[Bi-Directional Streaming] Server has completed sending us data/stream");
                latch.countDown();
            }

        };

        StreamObserver<HelloBiDirectionalStreamingRequest> requestObserver = asyncClient
                .helloBiDirectionalStreaming(responseObserver);

        HelloBiDirectionalStreamingRequest request1 = HelloBiDirectionalStreamingRequest.newBuilder() //
                .setName("John") //
                .build();
        requestObserver.onNext(request1);
        System.out.println("[Bi-Directional Streaming] Sending request1");

        HelloBiDirectionalStreamingRequest request2 = HelloBiDirectionalStreamingRequest.newBuilder() //
                .setName("Nisse") //
                .build();
        requestObserver.onNext(request2);
        System.out.println("[Bi-Directional Streaming] Sending request2");

        HelloBiDirectionalStreamingRequest request3 = HelloBiDirectionalStreamingRequest.newBuilder() //
                .setName("Bertil") //
                .build();
        requestObserver.onNext(request3);
        System.out.println("[Bi-Directional Streaming] Sending request3");

        // tell the server, the client is done sending data/streaming
        requestObserver.onCompleted();

        latch.await(5, TimeUnit.SECONDS);
    }

No comments: