rpc框架

之前介绍的sumk、或者是常用的dubbo,但是单语言的rpc框架,由于go语言慢慢抢占了一部分java的市场份额,越来越多的公司使用go语言。所以语言中立的grpc也开始慢慢流行起来。

gRPC 简介

gRPC 是一个高性能、开源和通用的 RPC 框架,面向服务端和移动端,基于 HTTP/2 设计。其中很多特性都需要基于http2,而对于http2的使用grpc只需要引入Netty作为NIO框架即可,netty通过配置codec即可适配http2。

特点:

  • 可支持多语言
  • 基于 IDL 文件定义服务,通过 proto3 工具生成指定语言的数据结构、服务端接口以及客户端 Stub;(与其他rpc框架不同,provider收到请求后直接调用,虽然反射多次调用后出现拐点,但直接调用的性能还是比反射好很多的)
  • 通信协议基于标准的 HTTP/2 设计,支持双向流、消息头压缩、单 TCP 的多路复用、服务端推送等特性(使用http2的特性,利用tcp多路复用,数据包分块传输);
  • 序列化支持 Protocol Buffer和 JSON,PB 是一种语言无关的高性能序列化框架,按照定义的固定消息类型序号进行传输过程中二进制的编解码。

demo

定义proto文件(helloworld.proto)

syntax = "proto3";

opt
n java_multiple_files = true;
option java_package = "com.wjiajun.grpc.hello";
option java_outer_classname = "HelloWorldProto";

package helloworld;


service Greeter {
  // 简单的rpc
  rpc SayHello (HelloRequest) returns (HelloReply) {} ;
}



message HelloRequest {
  string name = 1;
}


message HelloReply {
  string message = 1;
}

provider 代码:

/**
 * Server that manages startup/shutdown of a {@code Greeter} server.
 */
public class HelloWorldServer {

    private static final Logger logger = Logger.getLogger(HelloWorldServer.class.getName());

    private Server server;

    /**
     * Main launches the server from the command line.
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        final HelloWorldServer server = new HelloWorldServer();
        server.start();
        server.blockUntilShutdown();
    }

    private void start() throws IOException {
    /* The port on which the server should run */
        int port = 50051;
        server = ServerBuilder.forPort(port)
                .addService(new GreeterImpl())
                .build()
                .start();

        logger.info("Server started, listening on " + port);
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                // Use stderr here since the logger may have been reset by its JVM shutdown hook.
                System.err.println("*** shutting down gRPC server since JVM is shutting down");
                HelloWorldServer.this.stop();
                System.err.println("*** server shut down");
            }
        });
    }

    private void stop() {
        if (server != null) {
            server.shutdown();
        }
    }

    /**
     * Await termination on the main thread since the grpc library uses daemon threads.
     */
    private void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }
    }
}

client 代码:

public class HelloWorldClient {

    private static final Logger logger = Logger.getLogger(HelloWorldClient.class.getName());

    private final ManagedChannel channel;

    //阻塞模型
    private final GreeterGrpc.GreeterBlockingStub blockingStub;

    /**
     * Construct client connecting to HelloWorld server at {@code host:port}.
     */
    public HelloWorldClient(String host, int port) {
        this(ManagedChannelBuilder.forAddress(host, port)
                // Channels are secure by default (via SSL/TLS). For the example we disable TLS to avoid
                // needing certificates.
                /**
                 *
                 * 此处将设为文本连接 , 只用于测试
                 *
                 */
                .usePlaintext(true)
                .build());
    }

    /**
     * Construct client for accessing RouteGuide server using the existing channel.
     */
    HelloWorldClient(ManagedChannel channel) {
        this.channel = channel;
        blockingStub = GreeterGrpc.newBlockingStub(channel);
    }

    /**
     * Greet server. If provided, the first element of {@code args} is the name to use in the
     * greeting.
     */
    public static void main(String[] args) throws Exception {

        HelloWorldClient client = new HelloWorldClient("localhost", 50051);
        try {
      /* Access a service running on the local machine on port 50051 */
            String user = "world";
            if (args.length > 0) {
                user = args[0]; /* Use the arg as the name to greet if provided */
            }
            client.greet(user);
        } finally {
            client.shutdown();
        }
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
    }

    /**
     * Say hello to server.
     */
    public void greet(String name) {
        logger.info("Will try to greet " + name + " ...");
        HelloRequest request = HelloRequest.newBuilder().setName(name).build();
        HelloReply response;
        try {
            response = blockingStub.sayHello(request);
        } catch (StatusRuntimeException e) {
            logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
            return;
        }
        logger.info("Greeting: " + response.getMessage());
    }
}

通过maven插件根据pb文件生成相应的代码:

     <plugins>
            <!-- pb 文件编译工具,若修改需要重新编译 -->
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.3.0:exe:{os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:{os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>

grpc线程模型

类型dubbo线程池,默认dubbo线程池线程数200(虽然可通过继承dubbo ThreadPool返回getExecutor方式用系统线程池/spring线程池替代,但官方不建议)。grpc除了NIO框架线程池外,也有自带的线程池,这部分线程池与spring或一些其他框架自带线程池隔离,负责消息的序列化和反序列化。

类比tomcat,NIO线程和业务线程间一般都通过在内存中初始化一个队列的方式来交互。如下图(业务线程为grpc自带线程池, IO线程由netty提供)

类比tomcat,监听网络连接的线程数一般有多个(2 * cpu数),进行网络读写的NIO线程也有多个,由于IO线程和业务线程之前是通过内存队列来通信的,这就会使用经典的java block queue 中wait notify场景。基于管程模型,这其中必须涉及到锁。为了防止IO线程将网络数据包写入or读取完成后notify业务线程,导致大量业务线程进行锁竞争。所以,一般情况下都是拆分成多个queue,类别netty的boss group / work group,每个NioEventloop都携带一个taskqueue,一个channel只与其中一个NioEvenloop绑定。grpc也基于一致性hash将锁粒度减小并分散锁竞争压力,如下图:

通过线程绑定技术(例如采用一致性 hash 做映射), 将 Netty 的 I/O 线程与后端的服务调度线程做绑定,1 个 I/O 线程绑定一个或者多个服务调用线程,降低锁竞争,提升性能。

服务调用方式

常用的服务调用方式有两种:

  • 同步服务调用:大部分业务需要及时得到服务响应结果进行后续操作。
  • 异步服务调用:客户端发起服务调用之后,不同步等待响应,而是注册监听器或者回调函数,待接收到响应之后发起异步回调,驱动业务流程继续执行。
  • 基于 HTTP/2.0 的 streaming 调用

同步调用

客户端发起 RPC 调用,将请求消息路由到 I/O 线程,无论 I/O 线程是同步还是异步发送消息,发起调用的业务线程都会同步阻塞,等待服务端的应答,由 I/O 线程唤醒同步等待的业务线程,获取应答,然后业务流程继续执行。(有些框架例如sumk rpc、grpc 是通过在异步基础上增加wait来实现的)

异步调用

类似之前的sumk rpc,在sumk内部线程池(业务线程)基于mina发起调用后就返回了,不阻塞。由mina线程池阻塞等待返回结果,等返回结果由mina NIO线程set future后,再唤醒mina线程池进而回调业务线程之前注册的callback,在这整个流程中业务线程时不阻塞的。(如果基于grpc的Reactive编写异步调用,连grpc线程池都不需要阻塞,Reactive 风格的调用异步化更彻底一些)

注意:
1 异步后,业务线程肯定是异步直接返回的(后续再通过回调来使用),但NIO线程是否异步取决于编写风格,可以同步,也可以异步(Reactive 风格)。
2 当然,业务线程也可以选择同步一小段时间,比如dubbo,可以通过sent参数设置业务线程等待NIO线程发出消息后才从阻塞状态返回。

如下代码:

private RpcFuture sendAsync(Req req, long endTime) {
        // ...
        WriteFuture f = null;
        // 1
        f = session.write(req);
        // 2
        f.addListener(locker);
        return new RpcFutureImpl(locker);
    }

public final class RpcLocker implements IoFutureListener<WriteFuture> {

    // ...

    @Override
    // 3
    public void operationComplete(final WriteFuture future) {
        if (future.getException() == null) {
            return;
        }
        // 4
        SoaExcutors.getClientThreadPool().execute(() -> {
            if (LockHolder.remove(req.getSn()) == null) {
                return;
            }
            if (url != null) {
                HostChecker.get().addDownUrl(url);
            }
            // 5
            wakeup(RpcResult.sendFailed(req, future.getException()));
        });
    }
    // ...

1 业务线程调用mina进行rpc请求发送后,不阻塞。
2 业务线程继续注册callback。
3 mina线程池接收到影响后回调
4 将回调逻辑交给sumk系统线程池,不占用mina线程池处理其他请求。
5 sumk系统线程池进行具体业务处理(这里wakeup实际上不唤醒阻塞线程,和同步调用共用mina实现类)

类比上面sumk的异步处理逻辑,来看看一般rpc框架异步处理流程,如下图

可以看到,流程大致类似,另外,异步服务调用相比于同步服务调用有几个优点:

  • 化串行为并行,提升服务调用效率,减少业务线程阻塞时间。
  • 化同步为异步,避免业务线程阻塞。
  • 对于一些调用第三方接口(响应时间偶尔不太稳定),较耗时的定时任务,可以通过异步的方式

基于 HTTP/2.0 的 streaming 调用(特性均基于http2)

gRPC 提供了三种 streaming 模式:

  • 服务端 streaming
  • 客户端 streaming服务端
  • 客户端双向 streaming

服务端 streaming

服务端 streaming 模式指客户端 1 个请求,服务端返回 N 个响应,每个响应可以单独的返回,适用的场景主要是客户端发送单个请求,但是服务端可能返回的是一个响应列表,服务端不想等到所有的响应列表都组装完成才返回应答给客户端,而是处理完成一个就返回一个响应,直到服务端关闭 stream,通知客户端响应全部发送完成。

客户端 streaming

客户端发送多个请求,服务端返回一个响应,多用于汇聚和汇总计算场景。

双向 streaming

客户端发送 N 个请求,服务端返回 N 个或者 M 个响应,利用该特性,可以充分利用 HTTP/2.0 的多路复用功能,在某个时刻,HTTP/2.0 链路上可以既有请求也有响应,实现了全双工通信。

grpc 可通过定义proto文件实现基于http2的三种stream传输:
如下:

service User {

    // 服务端 to 客户端  , 服务端接收客户端UserRequest请求 返回 UserReply 流 (多次)
    rpc QueryUserByIds (UserRequest) returns (stream UserReply) {};

    // 客户端 to 服务端 流 ,客户端发送 UserRequest 流 (多次),服务端接收后返回 UserReply
    rpc QueryLastUsers (stream UserRequest) returns (UserReply) {};

        //双向流 (流是可以多次)
    rpc QueryUserByName (stream UserRequest) returns (stream UserReply) {};
}

序列化

grpc 使用Protocol Buffers 框架进行序列化与反序列化(也支持json)。

对于Protocol Buffers 的使用主要包括:

  • IDL 文件定义(*.proto), 包含数据结构定义,以及可选的服务接口定义(gRPC);
  • 各种语言的代码生成(含数据结构定义、以及序列化和反序列化接口);
  • 使用 Protocol Buffers 的 API 进行序列化和反序列化。

如上文例子,可通过 Maven 等构建工具,配置 protoc 命令,在打包 / 构建时生成代码。netty也通过了对Protocol Buffers的支持,只需要将handler加入对应的 ChannelPipeline即可。

由于序列化和反序列化都是cpu密集型操作。为了不占用nio线程,影响连接的channel读写,序列化和反序列化由grpc线程池自己实现。

spring boot适配

spring boot也提供了gRPC Spring Boot Starter支持通过@GrpcService自动配置并运行一个嵌入式的 gRPC 服务。
通过 @GrpcClient 自动创建和管理您的 gRPC Channels 和 stubs。

并且支持Spring Cloud注册中心体系,向 Consul 或 Eureka 或 Nacos 注册服务并获取 gRPC 服务端信息。

具体可dump gitee开源项目 grpc-spring-boot-starter

参考

  • 《深入浅出gGRPC》
最后修改日期: 2021年2月6日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。