Sumk之RPC

目前项目的所有模块基于 Sumk 框架实现模块间 PRC 的相互调用。于是,花了周末的时间阅读了 Sumk 框架中的 RPC 模块。Sumk 框架体积小,但其中拥有许多项目所必备的功能,通过它暴露出的 http 服务性能非常强悍。

RPC 原理

RPC(Remote Procedure Call Protocol) 也叫远程调用协议。在 OSI 网络通信模型中,RPC 跨越了传输层和应用层。调用方可以以本地调用的方式调用远程方法。调用双方在三次握手后通过 socket 进行数据传输,双方的数据格式、协议都可自己定制(网络通信框架有提供相应 API)。

RPC 模块

通过 github 下来源码后,通过 rpc 下的目录和代码调试研究可以看出共有几部分组成:Client、Server、Zookeeper、Mina、ASM 等。 pom 文件下关于 rpc 功能有两个依赖,分别是 mina 核心包和 zk 的开源客户端:

<dependency>
    <groupId>org.apache.mina</groupId>
    <artifactId>mina-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>
  • Client
  1. 通过 Sumk 启动的服务既可以做调用方也可以做接收方
  2. 对 Mina 进行了封装,对参数透传做了处理
  3. 提供 RpcFuture 做了请求的异步化
  4. 解析 zk 上指定的目录下的所有子节点获取所有接口信息,并对接口信息做实时监听
  • Server
  1. 对 Mina 进行了封装,请求实时监听
  2. 解析 RPC 注解,通过 ASM 生成所有 RPC 接口的实体
  3. 提供本地调用方式

Mina

Mina 是一个网络通信应用框架,提供了事件驱动、异步操作(基于 JAVA NIO)的编程模型。Mina 可以帮助我们实现高性能、高扩展的网络应用。 同时提供客户端和服务端,两端的网络通信结构都相同。工作流程如下图: 其中包括几个常用组件:

  • IoService

最底层的组件,Connector 和 Acceptor 的公共父类,负责具体 IO 工作。对上提供统一的基于事件的异步 IO 接口。当有数据时,IoService 会调用底层的 IO 接口并封装成 IoBuffer,以事件的方式通知上层。将 JAVA NIO 从同步转为异步(IoService 将数据从内核拷到了用户空间,对于上层来说,数据已经在用户空间),但 IoService 只用于连接建立,真正 IO 读写在 IoProcessor 组件。

  • IoAcceptor

用于接收客户端的连接,每监听一个端口就启用一个线程,然后创建 IoSession。

  • IoConnector

用于与服务器端建立连接,每连接一个服务器端就启动一个线程,然后创建 IoSession。

  • IoProcessor

使用与 IoService 不同的 Selector(Java NIO)。负责调用注册在 IoService 的的过滤器,并在过滤链之后调用 IoHandler。由于 IO 读写操作比较耗费 CPU,所以一般线程数设置为 CPU 的核数+1。所有 IoProcessor 组成一个线程池,在 IoAcceptor 或者 IoConnector 默认会关联一个 IoProcessor 线程池。

  • IoFilter

定义一组拦截器。包括日志打印、数据编码(write)与解码(read)。

  • IoHandler

负责编写业务逻辑,是 Mina 处理流程的终点,每个 IoService 都需要指定一个 IoHandler。

  • IoSession

对底层连接的封装,贯穿整个通信。包含当前连接相关的上下文信息。通过 IoSession 发送数据也是异步的,当经过 IoFilter 到达 IoService 后,IoService 不会立即调用底层 IO 发送数据,而是将调用封装成 WriteRequest,放入 writeRequestQueue 队列,再由 IoProcessor 线程统一 flush。

  • 额外的线程池

IoProcessor 线程负责调用的整个流程。为减轻负担,IoAcceptor 、IoConnector 还提供创建额外线程池,使 IoHandler 中某些事件方法可以独立在额外线程池中分配的线程运行,不需要运行在 IoProcessor 上。

Mina 完整工作流程

  1. 当 IoService(IoAcceptor、IoConnector)被创建后,同时关联创建一个用于连接的线程池,一个 IoProcessor 线程池、一个额外线程池。
  2. 当 IoService 建立套接字(IoAcceptor 的 bind() 或 IoConnector 的 connect())时,从连接线程池中取出一个线程,进行监听或连接。
  3. 当 IoService 监听到套接字上请求时,建立一个 IoSession 对象,从 IoProcessor 线程池中取出一个线程执行会话通道中的拦截器和 IoHandler。
  4. 当 IoSession 处于空闲或者关闭状态时,IoProcessor 线程回收。
  5. 为了减轻 IoProcessor 负担(执行一个会话上的所有拦截器、IoHandler),通过额外线程池将 IoHandler 中的某些事件在该线程池中分配线程执行。

Server 端接口初始化

SoaPlugin 组件通过实现 Plugin 接口,在服务启动后进行 Soa 接口初始化,其中 startAsync 方法可以保证有且仅执行一次。

@Bean
public class SoaPlugin implements Plugin {
    @Override
    public void startAsync() {
        // 解析服务中所有@Soa 注解
        resolveSoaAnnotation(StartContext.inst().getBeans());
        // Rpc 调用链初始化
        RpcHandler.init();
        // 初始化 Mina
        int port = StartContext.soaPort();
        if (port > -1) {
            String clzName = AppInfo.get("sumk.rpc.starter.class", "org.yx.rpc.server.start.SoaServer");
            Class<?> clz = Class.forName(clzName);
            Constructor<?> c = clz.getConstructor(int.class);
            server = (Lifecycle) c.newInstance(port);
        }
    }
}

resolveSoaAnnotation 方法会调用 SoaAnnotationResolver 类中的 resolve 方法,resolve 方法会解析所有带有@Bean 注解的类(@Bean 提供 IOC 功能,类似 Spring 的@Bean),

    public void resolve(Object bean) throws Exception {
        // 从 Bean 中获取 Class 对象,如果是 Box 的子类,说明是 ASM 生成的,可以通过 targetRawClass 获取原始类
        Class<?> clz = IOC.getTargetClassOfBean(bean);
        // 获取所有 Soa 的方法
        Method[] methods = clz.getDeclaredMethods();
        List<Method> actMethods = new ArrayList<>();
        for (final Method m : methods) {
            if (m.getAnnotation(Soa.class) == null) {
                continue;
            }
            actMethods.add(m);
        }
        List<MethodParamInfo> mpInfos = AsmUtils.buildMethodInfos(actMethods);
        for (MethodParamInfo info : mpInfos) {
            Method m = info.getMethod();
            Soa act = m.getAnnotation(Soa.class);
            // 在每一个 Soa 接口前加上 appId 和 groupId,拼接成完整的地址
            List<String> soaNames = nameResolver.solve(clz, m, act);
            // 通过 ASM 为每个方法创建一个 ArgPojo 子类(生成 params、invoke 等方法 invoke 通过反射调用 Class 的 soa 方法),然后再构建一个 RpcActionNode 实体,
            // 后面接口反射调用,接口权重,限流都是依赖于每个接口的 RpcActionNode 实体
            RpcActionNode node = new RpcActionNode(bean, m, ArgPojos.create(info), info.getArgNames(),
                    ParamFactory.create(m), act);
            // Soa 注解可以配置多个值也就是多个 RPC 接口,每个 RPC 接口对应一个 RpcActionNode,
            // 通过 RpcActions 整合,key 是 apiId+groupId+Soa 的一个接口路径,value 是 RpcActionNode
            for (String soaName : soaNames) {
                RpcActions.putActNode(soaName, node);
            }
        }
    }
}

初始化方法中另一个方法 RpcHandler.init 负责所有 Rpc 调用链 RpcFilter 的初始化,其中使用了责任链模式和访问者模式,默认初始化责任链终点,也就是最后一个过滤器只调用访问者的 visit 方法(访问者模式一般将实现写在具体的访问者中),不继续往下调用(访问者只支持一个,之前的过滤器可以对访问者里的请求参数进行修饰),调用类是一个扩展点。

public final class RpcHandler {

    private static RpcFilter filter;
    private static final RpcFilter LAST = new RpcFilter() {
        @Override
        public Object doFilter(RpcActionNode node, RpcVisitor visitor) throws Throwable {
            return visitor.visit(node);
        }
    };

    public static synchronized void init() {
        // 获取所有的 RpcFilter
        List<RpcFilter> list = IOC.getBeans(RpcFilter.class);
        if (list == null  list.isEmpty()) {
            filter = LAST;
            return;
        }
        // 设置每个 RpcFilter 的调用顺序
        final int size = list.size();
        for (int i = 0; i < size; i++) {
            RpcFilter current = list.get(i);
            if (i == size - 1) {
                current.setNext(LAST);
                break;
            }
            current.setNext(list.get(i + 1));
        }
        filter = list.get(0);
    }

    public static Object handle(RpcActionNode node, RpcVisitor visitor) throws Throwable {
        return filter.doFilter(node, visitor);
    }
}

而这个 RpcHandler 调用链一般在两个地方被调用,一个是本地调用,另一个是通过 Mina 监听请求后进行调用,两个地方的调用方式是一样的。如下: 如果调用双方在一个服务下,则直接本地调用,省去网络传输编解码的开销

public class LocalRequestHandler {
    // ....
    public Response handler(Request request, RpcActionNode action) {
        request.protocol(request.paramProtocol());
        Response resp = new Response();
        for (RequestHandler h : this.handlers) {
            if (h.handle(request, resp)) {
                resp.serviceInvokeMilTime(System.currentTimeMillis() - request.getStartInServer());
                return resp;
            }
        }
        return resp;
    }
}

远程调用的服务端通过实现 Mina 的 IoHandler 接口,然后在 messageReceived 方法中编写接收的业务逻辑,前面有提到 IoHandler 是 Mina 调用链路的终点。

public class ServerHandler implements IoHandler {
    //...
    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        Response resp = new Response();
        Request req = null;
        Object obj = this.deserializer.deserialize(message);
        if (Request.class.isInstance(obj)) {
            req = (Request) obj;
            ActionContext.rpcContext(req, req.isTest());
            for (RequestHandler h : handlers) {
                if (h.handle(req, resp)) {
                    resp.serviceInvokeMilTime(System.currentTimeMillis() - req.getStartInServer());
                    session.write(resp);
                    return;
                }
            }
        }
    }
}

可以看到两种方式的调用过程类似,都是遍历所有的 RequestHandler 然后进行处理,调用时通过一个 RequestHandler 进行间接调用的,RequestHandler 有两种实现,分别是类似表单参数传递和 json 参数传递。比如下面贴出表单参数传递方式。在 RequestHandler 实现类中才是初始化对应参数方式的访问者,然后执行调用链路。 可以发现,这里又一个扩展点,如果以后请求不仅有 json 和表单类型还想支持其他类型,可以自己实现 RequestHandler 接口做相应处理,和 OrderedParamReqHandler 类似,由于一种参数类型请求只能对应一种类型,所有在 handle 调用前需要先对请求的协议进行判断。并且扩展后本地调用和 Mina 远程调用都不需要修改。

@Bean
public class OrderedParamReqHandler implements RequestHandler {

    @Override
    public boolean handle(Request req, Response resp) {
        // 无论是本地调用还是 Mina 监听方式调用都要判断请求的类型
        if (!Protocols.hasFeature(req.protocol(), Protocols.REQ_PARAM_ORDER)) {
            return false;
        }
        resp.sn(req.getSn());
        try {
            String api = req.getApi();
            RpcActionNode node = RpcActions.getActionNode(api);
            // 限流校验
            BizExcutor.checkNode(api, node);
            // 调用刚才的 rpc 处理链
            Object ret = RpcHandler.handle(node, new OrderedRpcVisitor(req));
            resp.json(RpcGson.toJson(ret));
            resp.exception(null);
        } catch (Throwable e) {
            ServerExceptionHandler.handle(req, resp, e);
        } finally {
            ActionContext.remove();
        }
        return true;
    }

    // RpcFilter 调用链处理过程中的访问者在这定义,也分表单和 json 方式
    private static final class OrderedRpcVisitor extends RpcVisitor {
        @Override
        public Object visit(RpcActionNode info) throws Throwable {
            // 根据调用方式不同进行参数校验和调用,具体的调用对象使用之前初始化好的 RpcActionNode
            return info.invokeByOrder(req.getParamArray());
        }
    };
}

public Object invokeByOrder(String... args) throws Throwable {
        // ...校验逻辑
       // 加载通过 ASM 生成的代理类并对这次请求生成一个对象,每个接口都有一个 ArgPojo 代理
        ArgPojo pojo = Loader.newInstance(this.argClz);
       // 将 RpcActionNode 中的字段值设置到 ArgPojo 对象中
        for (int i = 0; i < fields.length; i++) {
            if (i >= args.length  args[i] == null) {
                continue;
            }
            Field f = fields[i];
            f.set(pojo, RpcGson.fromJson(args[i], f.getGenericType()));
        }
        // 具体调用 this.owner 是在 Sumk 的 IOC 容器定义的包含@soa 接口的对象
       //  pojo 中有通过 ASM 添加的 invoke 方法,可以进行具体的反射调用
        return BizExcutor.exec(pojo, this.owner, pojo.params(), this.paramInfos);
}


public final class BizExcutor {

    public static Object exec(ArgPojo argObj, Object owner, Object[] params, ParamInfo[] infos) throws Throwable {
        try {
            // ....
            // argObj 每个接口都有一个
            // 调用 asm 生成的 invoke 方法,在 invoke 方法中使用反射调用 owner 对象的相应方法。
            return argObj.invoke(owner);
        } catch (Throwable e) {
            // 省略
            return null;
        }
    }
}

Server 端的 Mina 部分

上文提到 Mina 为 RPC 远程调用提供了 Client 端和 Server 端的封装,Server 端需要实现的部分有 IoAcceptor、IoHandler、IoFilter(可选,Mina 有默认的编解码器)、协议解析等。 上文提到在初始化接口时涉及一个类 SoaPlugin,这个类会在容器启动后自动调用。现在我们来看这个类中 satrtAsync 方法的部分 Mina 相关逻辑。

@Override
public void startAsync() {
    // 获取配置定义的端口
    int port = StartContext.soaPort();
    if (port > -1) {
        String clzName = AppInfo.get("sumk.rpc.starter.class", "org.yx.rpc.server.start.SoaServer");
        Class<?> clz = Class.forName(clzName);
        Constructor<?> c = clz.getConstructor(int.class);
        // 使用反射生成一个 SoaServer 实例
        server = (Lifecycle) c.newInstance(port);
    }
}

这段代码目的是生成一个 SoaServer 实例。SoaServer 类中包含两部分,启动 Mina 的 Server 端服务注册所有之前组装好的接口到 zk 上,并注册监听。先看第一部分:

public class SoaServer implements Lifecycle {
    public SoaServer(int port) {
        this.init(port);
    }

    protected void init(int port) {
        port = startServer(ip, port);
        // ...
    }

    protected int startServer(String ip, int port) throws Exception {
        List<RequestHandler> handlers = IOC.getBeans(RequestHandler.class);
        // 初始化一个 MinaServer 实例,并调用 run
        server = new MinaServer(ip, port, handlers);
        server.run();
        return server.getPort();
    }


    @Override
    public synchronized void start() {
        // 注册 AppInfo 监听,当 AppInfo 配置有变更时实时通知
        AppInfo.addObserver(info -> {
            if (!SoaServer.this.started) {
                return;
            }
            // ...
        });
        started = true;

    }
}

上面代码主要做了配置监听并将用户初始化 Mina 服务的线程手动跑起来。下面重点就是 MinaServer 中的 run 方法:

public class MinaServer implements Runnable {

    private IoHandler handler;
    private int acceptors;
    private SocketAcceptor acceptor;

    public synchronized void run() {
        // 每监听一个端口(每调用一次 bind() 方法),都启用一个接收者线程,否则只有一个接收者线程,用于监听连接,不做实际 IO
        acceptor = acceptors > 0 ? new NioSocketAcceptor(acceptors) : new NioSocketAcceptor();
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();
        //设置过滤器
        // 定义 Sumk 框架自带的编解码工厂
        chain.addLast("codec", new ProtocolCodecFilter(IOC.get(SumkCodecFactory.class)));
        // 定义一个额外线程池,用于分担 IoProcessor 线程的压力,有关 Server 端的操作都使用这个线程池,与 Client 端分开
        chain.addLast("threadpool", new ExecutorFilter(SoaExcutors.getServerThreadPool()));
        // 设置处理 handler
        acceptor.setHandler(handler);

        // 每隔一段时间调用一次 handler 的 sessionIdle 方法
        acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, AppInfo.getInt(SOA_SESSION_IDLE, 600));
        if (SocketSessionConfig.class.isInstance(acceptor.getSessionConfig())) {
            SocketSessionConfig conf = (SocketSessionConfig) acceptor.getSessionConfig();
            conf.setKeepAlive(true);
            conf.setReceiveBufferSize(100);
            conf.setSendBufferSize(8192);

        }
        // 获取随机端口并通过 bind 绑定该端口
        boolean randomPort = this.port < 1;
        for (int i = 0; i < 50; i++) {
            try {
                InetSocketAddress addr = listenAddr(randomPort);
                acceptor.bind(addr);
                Log.get("sumk.rpc.server").info("rpc listening on " + addr);
                break;
            } catch (IOException e) {
                if (randomPort) {
                    continue;
                }
                // 线程休息指定时间后重试
                LockSupport.parkUntil(System.currentTimeMillis() + time);
            }
        }
    }
}

再来看看自定义的 IoHandler,上文中有提到过:

public class ServerHandler implements IoHandler {

    private final ProtocolDeserializer deserializer;


    // 间隔一段时间调用一次,与之前创建 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, AppInfo.getInt(SOA_SESSION_IDLE, 600));关联
    public void sessionIdle(IoSession session, IdleStatus status) {
        long time = System.currentTimeMillis() - session.getLastIoTime();
        if (time > AppInfo.getLong(MinaServer.SOA_SESSION_IDLE, 60) * 1000) {
            session.closeOnFlush();
        }
    }

    // 请求错误时调用
    public void exceptionCaught(IoSession session, Throwable cause) {
        session.closeOnFlush();
        if (cause == null) {
            return;
        }
        if (cause.getClass() == IOException.class && AppInfo.getBoolean("rpc.session.print_simple_error", true)) {
            String msg = cause.getMessage();
            if (msg != null && (msg.contains("连接")  msg.contains("connection"))) {
                return;
            }
        }

    }

    // 当连接建立后,客户端发送消息这边的监听者可以马上收到
    public void messageReceived(IoSession session, Object message) throws Exception {
        Response resp = new Response();
        Object obj = this.deserializer.deserialize(message);
        if (Request.class.isInstance(obj)) {
            Request req = (Request) obj;
            ActionContext.rpcContext(req, req.isTest());
            for (RequestHandler h : handlers) {
                if (h.handle(req, resp)) {
                    resp.serviceInvokeMilTime(System.currentTimeMillis() - req.getStartInServer());
                    session.write(resp);
                    return;
                }
            }
        }
    }

    // session 关闭方法
    public void inputClosed(IoSession session) throws Exception {
        session.closeNow();
    }
}

另外,在 acceptor 的自定义编解码工厂 SumkCodecFactory 有点复杂,还需要找时间研究一下。

Server 端 zk 的应用

在 SoaServer 实例初始化后,会向 zk 发送 Server 端的所有接口信息。我们知道 zk 可以作为一个轻量级的注册中心,结构和 linux 的文件系统类似,都是树形的。只不过 zk 的结构是存储在内存中的。我们可以专门设定一个节点目录用于存储所有 Server 暴露的接口。这样 Client 要调用前直接去 zk 上拉取就行。而且 Server 端和 Client 端都可以对这些接口信息节点做监听,将最新的接口信息拉到自己服务本地。 Sumk 的做法是在 zk 上注册一个 sumk_soa 节点,sumk_soa 节点下的子节点存储的是一组 Server Ip 列表。由 ip + 端口号组成,每个子节点存储对应 Server 下的所有接口信息。接下来进入代码部分,去掉之前关于 Mina 初始化部分的代码后:

public class SoaServer implements Lifecycle {

    private volatile boolean started = false;
    private MinaServer server;
    private String zkUrl;
    private Host host;
    private boolean enable;
    // Sumk 在 zk 上的根节点
    private final String SOA_ROOT = AppInfo.get("sumk.rpc.server.route", "sumk.rpc.server.zk.route",
            ZKConst.SUMK_SOA_ROOT);

    // 节点信息变更监听,也就是每个 Ip 子节点中的接口信息变更时触发
    private final IZkStateListener stateListener = new IZkStateListener() {
        @Override
        public void handleStateChanged(KeeperState state) throws Exception {}

        @Override
        public void handleNewSession() throws Exception {
            byte[] data = createZkPathData();
            ZkClientHelper.getZkClient(zkUrl).createEphemeral(fullPath(), data);
        }

        @Override
        public void handleSessionEstablishmentError(Throwable error) throws Exception {}

    };

    // 获取 zk 上某个 Server 的完整路径
    private String fullPath() {
        StringBuilder sb = new StringBuilder().append(SOA_ROOT).append('/')
                .append(ZkDataOperators.inst().getName(host));
        return sb.toString();
    }

    // 去掉某个节点,并注销对该节点的变更监听
    private final Runnable zkUnRegister = () -> {
        ZkClient client = ZkClientHelper.getZkClient(zkUrl);
        client.unsubscribeStateChanges(stateListener);
        client.delete(fullPath());
    };

    // 创建某个节点,并创建对该节点的变更监听
    private final Runnable zkRegister = () -> {
        ZkClient client = ZkClientHelper.getZkClient(zkUrl);
        byte[] data = null;
        try {
            data = createZkPathData();
        } catch (Exception e) {
            return;
        }
        zkUnRegister.run();
        // 接口信息设置
        client.createEphemeral(fullPath(), data);
        // 注册监听
        client.subscribeStateChanges(stateListener);
    };

    // 对之前封装的 RpcActions 获取所有已发布(默认发布)接口列表,然后进行序列化
    private byte[] createZkPathData() throws Exception {
        List<String> methods = RpcActions.publishSoaSet();
        final Map<String, String> map = new HashMap<>();
        for (String method : methods) {
            map.put(ZKConst.METHODS + "." + method, AppInfo.get("sumk.rpc.method." + method));
        }
        map.put(ZKConst.FEATURE, Profile.featureInHex());
        map.put(ZKConst.START, String.valueOf(System.currentTimeMillis()));
        map.put(ZKConst.WEIGHT, AppInfo.get("sumk.rpc.weight", "100"));
        // ZkDataOperators 提供 zk 上数据的序列化的反序列化
        return ZkDataOperators.inst().serialize(host, map);
    }

    @Override
    public synchronized void start() {
        if (this.enable) {
            this.zkRegister.run();
        } else {
            this.zkUnRegister.run();
        }
        // 监听 AppInfo 配置,并进行实时变更
        AppInfo.addObserver(info -> {
            if (!SoaServer.this.started) {
                return;
            }
            // 判断是否是否可用通过配置判断,可用就注册,不可用就注销
            if (serverEnable) {
                SoaServer.this.zkRegister.run();
            } else {
                SoaServer.this.zkUnRegister.run();
            }
            enable = serverEnable;
        });
        started = true;
    }

    protected void init(int port) {
        // 获取 zk 地址
        String ip_zk = StartContext.soaHostInzk();
        int port_zk = StartContext.soaPortInZk();
        this.host = Host.create(ip_zk, port_zk);
        zkUrl = AppInfo.getServerZKUrl();
        // 获取并初始化 zk 客户端
        ZkClient client = ZkClientHelper.getZkClient(zkUrl);
        // 递归创建节点目录
        ZkClientHelper.makeSure(client, SOA_ROOT);
    }
}

Client 端的 zk 的应用

Client 调用一般的写法是这样的:

Object result = Rpc.call("appId.groupId.interfaceName", args...);

我们以 Rpc.call 为出发点来看看内部代码,首先是初始化部分:

public final class Rpc {

    private static volatile boolean strated;

    private static String appId;

    public static synchronized void init() {
        appId = AppInfo.appId("sumk");
        DEFAULT_TIMEOUT = AppInfo.getInt("sumk.rpc.timeout", 30000);
        String zkUrl = AppInfo.getClinetZKUrl();
        // 获取 zk 上的接口信息,并进行解析
        ZkRouteParser.get(zkUrl).readRouteAndListen();
        // Mina 发送端初始化
        ReqSession.init();
        strated = true;
    }

可以看到 zk 相关的 ZkRouteParser 类,ReqSession 等等再介绍。 ZkRouteParser 类主要做的功能是对 zk 关于接口的根目录节点(和 Server 端一样)做监听,根目录下的的所有 ip+端口节点(所有接口的提供方地址)做变更监听,然后对某个子节点下的接口信息做内容变更监听。当发现有变化时,以事件的方式发送到一个处理 zk 信息变更的队列,然后再由某个线程将事件一一取出并做相应的变更。

public final class ZkRouteParser {
    private final String zkUrl;
    private Set<String> childs = Collections.emptySet();
    // 和发送端一样的根节点目录
    private final String SOA_ROOT = AppInfo.get("sumk.rpc.zk.route", "sumk.rpc.client.zk.route", ZKConst.SUMK_SOA_ROOT);
    private final ThreadPoolExecutor executor;
    // 用于处理 zk 信息变更事件的队列
    private final BlockingQueue<RouteEvent> queue = new LinkedBlockingQueue<>();

    private ZkRouteParser(String zkUrl) {
        this.zkUrl = zkUrl;
        // 用于处理 zk 信息变更事件的线程池
        executor = new ThreadPoolExecutor(1, 1, 5000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10000),
                SumkThreadPool.createThreadFactory("rpc-client-"), new ThreadPoolExecutor.DiscardPolicy());
        executor.allowCoreThreadTimeOut(true);
    }

    public static ZkRouteParser get(String zkUrl) {
        return new ZkRouteParser(zkUrl);
    }

    public void readRouteAndListen() throws IOException {
        Map<Host, RouteInfo> datas = new HashMap<>();
        ZkClient zk = ZkClientHelper.getZkClient(zkUrl);
        // 递归创建根节点目录
        ZkClientHelper.makeSure(zk, SOA_ROOT);

        // 监听根节点下子节点(ip + 端口 地址信息)中的接口信息变更
        final IZkDataListener nodeListener = new IZkDataListener() {
            ZkRouteParser parser = ZkRouteParser.this;

            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
                int index = dataPath.lastIndexOf("/");
                if (index > 0) {
                    dataPath = dataPath.substring(index + 1);
                }
                // 读取 zk 上的接口信息并进行反序列化
                RouteInfo d = ZkDataOperators.inst().deserialize(new ZKPathData(dataPath, (byte[]) data));
                if (d == null  d.intfs().isEmpty()) {
                    // 将变更封装为删除事件放入待处理队列
                    parser.handle(RouteEvent.deleteEvent(Host.create(dataPath)));
                    return;
                }
                // 将变更封装为修改事件放入待处理队列
                parser.handle(RouteEvent.modifyEvent(d));
            }

            @Override
            public void handleDataDeleted(String dataPath) {}

        };

        // 为根节点下的所有子节点注册监听,当新增或删除一个接口服务提供者时回调
        List<String> paths = zk.subscribeChildChanges(SOA_ROOT, new IZkChildListener() {
            ZkRouteParser parser = ZkRouteParser.this;

            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                List<String> ips = filter(currentChilds);

                List<String> createChilds = new ArrayList<>();
                Set<String> deleteChilds = new HashSet<>(parser.childs);
                for (String zkChild : ips) {
                    boolean exist = deleteChilds.remove(zkChild);
                    // 如果之前存在则不需要重复注册
                    if (!exist) {
                        createChilds.add(zkChild);
                    }
                }
                ZkClient zkClient = ZkClientHelper.getZkClient(zkUrl);
                // 存储所有的提供者地址信息
                parser.childs = new HashSet<>(ips);
                for (String create : createChilds) {
                    // 读取子节点下的接口信息
                    RouteInfo d = parser.getZkNodeData(zkClient, create);
                    // 将变更封装为创建事件放入待处理队列
                    parser.handle(RouteEvent.createEvent(d));
                    // 注册某个服务提供者对应 zk 节点的信息变更监听
                    zk.subscribeDataChanges(parentPath + "/" + create, nodeListener);
                }
                for (String delete : deleteChilds) {
                    // 将变更封装为删除事件放入待处理队列
                    parser.handle(RouteEvent.deleteEvent(Host.create(delete)));
                    // 注销某个服务提供者对应 zk 节点的信息变更监听
                    zk.unsubscribeDataChanges(parentPath + "/" + delete, nodeListener);
                }
            }

        });
        this.childs = new HashSet<>(paths);
        for (String path : paths) {
            // 获取 zk 上对应路径下节点的信息
            RouteInfo d = getZkNodeData(zk, path);
            if (d == null) {
                continue;
            }
            // 注册当前子节点接口信息变更监听
            zk.subscribeDataChanges(SOA_ROOT + "/" + path, nodeListener);
            datas.put(d.host(), d);
        }
        Routes.refresh(datas.values());
    }

    // 将所有关于 zk 信息变更的事件放入队列
    public void handle(RouteEvent event) {
        queue.offer(event);
        this.executor.execute(() -> {
            synchronized (ZkRouteParser.this) {
                List<RouteEvent> list = new ArrayList<>();
                // 拉取所有待处理的事件
                queue.drainTo(list);
                // 获取当前的 zk 路由信息数据,每个服务提供方对应一个 RouteInfo,在调用时使用
                List<RouteInfo> data = Routes.currentDatas();
                Map<Host, RouteInfo> map = new HashMap<>();
                for (RouteInfo r : data) {
                    map.put(r.host(), r);
                }
                if (handleData(map, list) > 0) {
                    // 刷新目前服务本地存储的接口信息
                    Routes.refresh(map.values());
                }
            }
        });
    }

    // data 参数是目前服务中存储的所有接口数据,list 是所有待处理的事件
    private int handleData(Map<Host, RouteInfo> data, List<RouteEvent> list) {
        int count = 0;
        for (RouteEvent event : list) {
            switch (event.getType()) {
            case CREATE:
            case MODIFY:
                // 新增 zk 提供方和对应的接口
                data.put(event.getUrl(), event.getRoute());
                count++;
                break;
            case DELETE:
                // 移除 zk 提供方
                if (data.remove(event.getUrl()) != null) {
                    count++;
                }
                break;
            default:
                break;
            }
        }
        return count;
    }
}

从上面的代码可以看出,真正存储 zk 数据的是 Routes 类,接下来看看这个类。

public final class Routes {
    private final Map<String, RpcRoute> rpcRoutes;
    private final List<RouteInfo> zkDatas;

    private Routes(List<RouteInfo> zkDatas, Map<String, RpcRoute> routes) {
        this.zkDatas = Objects.requireNonNull(zkDatas);
        this.rpcRoutes = Objects.requireNonNull(routes);
    }

    // 初始化,为了其他线程能立即看到初始化线程的对 Routes 的初始化,所以声明为 volatile
    private static volatile Routes ROUTE = new Routes(Collections.emptyList(), Collections.emptyMap());

    public static List<RouteInfo> currentDatas() {
        return Collections.unmodifiableList(ROUTE.zkDatas);
    }

    public static RpcRoute getRoute(String api) {
        return ROUTE.rpcRoutes.get(api);
    }

    private static void _refresh(Collection<RouteInfo> rawData, Map<String, RpcRoute> route) {
        // rawData 为所用的提供方信息,每个 RouteInfo 包含该服务在 zk 上存储的所有接口信息
        // route 的 key 为接口完整路由,value 为提供该接口的所有服务地址所封装成的实体,之所有封装成 RpcRoute 是因为按照该接口在不同的服务提供方的权重可能不同、是否宕机等等因素,调用时可以依据这些特点进行调用
        List<RouteInfo> data = new ArrayList<>(rawData);
        Routes r = new Routes(data, route);
        // 将所有信息封装成 Routes,供 rpc 提供方调用接口时使用
        Routes.ROUTE = r;
    }

    public static synchronized void refresh(Collection<RouteInfo> datas) {
        // 存储某个接口所有提供方的信息,key 为接口完整路由,value 为所有提供该接口的服务地址集合
        Map<String, Set<ServerMachine>> map = new HashMap<>();
        for (RouteInfo r : datas) {
            Map<String, ServerMachine> ms = createServerMachine(r);
            ms.forEach((m, serverMachine) -> {
                Set<ServerMachine> server = map.get(m);
                if (server == null) {
                    server = new HashSet<>();
                    map.put(m, server);
                }
                server.add(serverMachine);
            });
        }
        // 将接口的所有提供方机器信息封装成 RpcRoute,并创建新的集合
        Map<String, RpcRoute> routes = new HashMap<>();
        map.forEach((method, servers) -> {
            if (servers == null  servers.isEmpty()) {
                return;
            }
            routes.put(method, new RpcRoute(servers));
        });
        _refresh(datas, routes);
    }

    // 传入从某个 zk 提供者拉下来的数据,返回接口信息集合,key 为接口的完整路由 (appId+groupId+接口名称),value 为提供该接口的服务的地址信息
    private static Map<String, ServerMachine> createServerMachine(RouteInfo data) {
        Map<String, ServerMachine> servers = new HashMap<>();
        int weight = data.weight() > 0 ? data.weight() : 100;
        data.intfs().forEach(intf -> {
            ServerMachine server = new ServerMachine(data.host(), weight);
            servers.put(intf.getName(), server);
        });
        return servers;
    }
}

Client 端的 Mina 部分

通过拉取所有 zk 上的所有接口信息进行本地存储后,现在就有接口可以调用了,接下来需要初始化 Mina 的客户端部分,还是 Rpc 这个类:

public final class Rpc {

    public static synchronized void init() {
        // Mina 发送端初始化
        ReqSession.init();
        strated = true;
    }
}

接下来来看一下 ReqSession 的初始化逻辑和 Mina session 部分,ReqSession 主要是对 Mina 中 session 的封装,通过上文我们知道,每创建好一个连接后就创建一个 session,session 随机连接的创建而创建,随着连接的关闭而销毁:

public final class ReqSession {

    protected volatile IoSession session;

    // SocketConnector 提供者,SocketConnectorSupplier 里是 SocketConnector 的初始化部分
    private static Supplier<SocketConnector> connectorSupplier = new SocketConnectorSupplier();
    private Host addr;
    // 加把锁,当多多个线程调用同一个 session 进程连接时,session 不至于被赋值多次
    private final Lock lock = new ReentrantLock();

    private boolean ensureSession() {
        if (session != null && !session.isClosing()) {
            return true;
        }
        // 获取 SocketConnector 进行连接建立
        SocketConnector connector = this.getConnector();
        // 加把锁,当多多个线程调用同一个 session 进程连接时,session 不至于被赋值多次
        // 与 jdk 内置锁不同,当指定时间还未获取到锁时,直接放弃
        if (lock.tryLock(connector.getConnectTimeoutMillis() + 2000, TimeUnit.MILLISECONDS)) {
            try {
                if (session != null && !session.isClosing()) {
                    return true;
                }
                connect(connector);
            } finally {
                lock.unlock();
            }
        }

        return true;
    }

    // 连接建立部分
    private void connect(SocketConnector connector) throws InterruptedException {
        // 通过上文我们知道,由于 IoService 也就是 SocketConnector,对数据进行了同步转异步处理
        ConnectFuture cf = connector.connect(addr.toInetSocketAddress());
        // 所有我们无法得知连接创建完成时间,和 jdk 内置 Future 类似,通过 await 我们可以异步转同步
        cf.await(connector.getConnectTimeoutMillis() + 1);

        // 连接成功后获取会话对象。如果没有上面的等待,由于 connect() 方法是异步的,session 可能会无法获取
        IoSession se = cf.getSession();
        if (se != null) {
            this.session = se;
            return;
        }
        cf.cancel();
    }

    // 数据发送
    public WriteFuture write(Req req) {
        return this.session.write(req);
    }

    // 连接关闭
    public void close() {
        IoSession s = this.session;
        if (s != null && s.isConnected()) {
            this.session.closeNow();
        }
    }

    // SocketConnector 初始化
    public static void init() {
        connectorSupplier.get();
    }
}

通过上面代码,可以看出 SocketConnectorSupplier 负责 SocketConnector 初始化和获取:

public class SocketConnectorSupplier implements Supplier<SocketConnector> {

    private volatile SocketConnector connector;

    @Override
    public SocketConnector get() {
        SocketConnector con = this.connector;
        if (con != null && !con.isDisposing() && !con.isDisposed()) {
            return con;
        }
        return this.create();
    }

    private synchronized SocketConnector create() {
        if (connector != null && !connector.isDisposing() && !connector.isDisposed()) {
            return connector;
        }
        // 参数为 IoProcessor 线程池的大小,默认为 cpu 核数 + 1,可配置
        NioSocketConnector con = new NioSocketConnector(
                AppInfo.getInt("sumk.rpc.client.poolsize", Runtime.getRuntime().availableProcessors() + 1));
        con.setConnectTimeoutMillis(AppInfo.getInt("sumk.rpc.connect.timeout", 5000));
        // 设置固定空闲时间回调,类似上文中 Mina 的服务端初始化
        con.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, AppInfo.getInt(MinaServer.SOA_SESSION_IDLE, 600));
        // 设置客户端处理 IoHandler
        con.setHandler(new ClientHandler());
        // 设置编解码工厂和 IoAcceptor 端类型
        con.getFilterChain().addLast("codec", new ProtocolCodecFilter(IOC.get(SumkCodecFactory.class)));
        if (AppInfo.getBoolean("sumk.rpc.client.threadpool.enable", true)) {
            // 设置额外线程池,当前服务的所有客户端连接都使用该线程池
            con.getFilterChain().addLast("threadpool", new ExecutorFilter(SoaExcutors.getClientThreadPool()));
        }
        this.connector = con;
        return con;
    }
}

接下来 Mina 客户端剩下最后一部分,就是 IoHandler 的实现。

public class ClientHandler implements IoHandler {

    private final ProtocolDeserializer deserializer;

    public ClientHandler() {
        // 获取自定义的协议序列处理
        deserializer = IOC.get(ProtocolDeserializer.class);
    }

    // 当 session 空闲指定时间后调用该方法,如果发现超过允许的空闲时间则关闭 session 并将 session 未发送的数据 flush 出去
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        long time = System.currentTimeMillis() - session.getLastIoTime();
        if (time > AppInfo.getLong(MinaServer.SOA_SESSION_IDLE, 60) * 1000) {
            Log.get("sumk.rpc.client").info("rpc session {} {} for {}ms,closed by this client", session.getId(), status,
                    session.getLastIoTime(), time);
            session.closeOnFlush();
        }
    }

    // 调用异常时记录日志
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        Log.get("sumk.rpc.client").error(session + " throw exception", cause);
    }

    // 服务端响应数据时该方法被触发,并通过指定解析类对数据进行解析
    public void messageReceived(IoSession session, Object message) throws Exception {
        // 这个和 IoAcceptor、IoConnector 初始化时定义的解析工具不同,IoAcceptor、IoConnector 定义的解析工具都在 Mina 调用链末尾,也就是 write 的最后一次处理,和 read 的第一次处理
        // 这里的解析时在 IoAcceptor、IoConnector 初始化定义的解析工具解析完之后对数据的类型进行指定格式的解析
        Object obj = this.deserializer.deserialize(message);
        if (obj == null) {
            return;
        }
        if (Response.class.isInstance(obj)) {
            Response resp = (Response) obj;
            // 这里有一个连接阻塞唤醒操作,在下文讲 RpcFuture 的时候会说明
            LockHolder.unLockAndSetResult(resp);
            return;
        }
    }
}
@Bean
public class ProtocolDeserializerImpl implements ProtocolDeserializer {

    @Inject
    private SumkMinaDeserializer<?>[] decoders;

    @Override
    public Object deserialize(Object message) throws Exception {
        ProtocolObject obj = ProtocolObject.class.cast(message);
        int protocol = obj.getProtocol();
        for (SumkMinaDeserializer<?> decoder : this.decoders) {
            // 解析处理需要根据相应的接口类型进行相应的解析处理,这里同样有个扩展点,
            // 可以新增请求的类型,而不需要修改代码,这里的类型,和服务端的表单、json 类型常量相对应
            if (decoder.accept(protocol)) {
                return decoder.decode(protocol, obj.getData());
            }
        }
        throw new ProtocolDecoderException("no sumk decoder:" + Integer.toHexString(protocol));
    }
}

Client 端的调用

Client 端初始化部分大致是上文这些,接下去是 Client 的调用部分:

public final class Rpc {

    // 每次 Client 执行 execute 时调用,记录每次请求的所有请求信息,然后进行调用透传
    static Req req(String method) {
        Req req = new Req();
        ActionContext context = ActionContext.get();
        req.setStart(System.currentTimeMillis());
        String sn = UUIDSeed.seq18();
        req.setFullSn(sn, context.traceId(), context.nextSpanId());
        req.setUserId(context.userId());
        req.setApi(method);
        req.setSrc(appId);

        req.setAttachments(context.attachmentView());
        return req;
    }

    // 表单方式调用,execute 方法返回 RpcFuture,通过 getOrException 对 future 进行阻塞,实现异步转同步
    public static String call(String method, Object... args) {
        return new Client(method).paramInArray(args).timeout(DEFAULT_TIMEOUT).execute().getOrException();
    }

    // json 方式调用,execute 方法返回 RpcFuture,通过 getOrException 对 future 进行阻塞,实现异步转同步
    public static String callInJson(String method, String json) {
        return new Client(method).paramInJson(json).timeout(DEFAULT_TIMEOUT).execute().getOrException();
    }

    // 表单方式调用,execute 方法返回 RpcFuture,异步调用
    public static RpcFuture callAsync(String method, Object... args) {
        return new Client(method).paramInArray(args).execute();
    }

    // json 方式调用,execute 方法返回 RpcFuture,异步调用
    public static RpcFuture callInJsonAsync(String method, String json) {
        return new Client(method).paramInJson(json).execute();
    }
}

Client 类主要有几个功能: 1. 可以设置请求接口响应后的回调 2. 对请求进行判断调用路径(本地调用或远程调用) 3. 通 execute 发送请求 4. 对请求未响应的线程进行阻塞 5. 如果请求迟迟未响应则作为错误处理,并唤醒阻塞的请求线程 6. 为 rpc 透传提供封装参数 接下看一下 Client 类:

public final class Client {

    private static enum ParamType {
        JSONARRAY, JSON
    }

    private final String api;

    private long totalStart;

    private Consumer<RpcResult> callback;

    Client(String api) {
        this.api = api;
    }

    // 设置超时时间
    public Client timeout(int timeout) {
        this.totalTimeout = timeout;
        return this;
    }

    // 设置请求完成后的回调事件
    public Client callback(Consumer<RpcResult> callback) {
        this.callback = callback;
        return this;
    }

    /**
     * 本方法调用之后,不允许再调用本对象的任何方法 <BR>
     * 
     * @return 用无论是否成功,都会返回 future。如果失败的话,异常包含在 future 中。<BR>
     *         通信异常是 SoaException;如果是业务类异常,则是 BizException
     */
    public RpcFuture execute() {
        // 获取当前时间,后面对轮询超时的请求,并做相应的错误处理
        this.totalStart = System.currentTimeMillis();
        // 封装 rpc 透传参数
        Req req = Rpc.req(this.api);
        // 根据调用参数类型选择合适的参数类型
        if (this.paramType == ParamType.JSONARRAY) {
            req.setParamArray((String[]) this.params);
        } else {
            req.setJsonedParam((String) this.params);
        }
        // 发起异步调用,并传递调用结束的截止时间
        RpcFuture f = sendAsync(req, this.totalStart + this.totalTimeout);
        // 当发送错误或者调用超时时
        if (f.getClass() == ErrorRpcFuture.class) {
            ErrorRpcFuture errorFuture = ErrorRpcFuture.class.cast(f);
            RpcLocker locker = errorFuture.locker;
            // 和 jdk 线程阻塞类似,管程模型,都需要锁,移除用于该请求阻塞的锁
            LockHolder.remove(locker.req.getSn());
            // 将阻塞线程唤醒
            locker.wakeup(errorFuture.rpcResult());
        }
        return f;
    }

    private RpcFuture sendAsync(Req req, long endTime) {
        // 封装 RpcLocker,RpcLocker 类下文解析,将回调事件和透传参数传入
        final RpcLocker locker = new RpcLocker(req, callback);
        Host url = null;
        // 当地址为空时(没有定义直接路由时),尝试进行本地调用
        if (url == null) {
            // 根据接口路由获取之前封装的 Routes,RpcRoute 包含所有提供该接口服务的地址列表
            RpcRoute route = Routes.getRoute(api);
            // 进行本地调用
            RpcFuture future = this.tryLocalHandler(req, locker, route);
            if (future != null) {
                return future;
            }

            url = route.getUrl();
        }
        locker.url(url);
        WriteFuture f = null;
        // 通过请求 url 获取 session
        ReqSession session = ReqSessionHolder.getSession(url);
        // 注册锁信息,LockHolder 可用于超时请求的监听处理,下文解析
        LockHolder.register(locker, endTime);
        f = session.write(req);
        if (f == null) {
            // 如果 session 返回接口为空则封装错误 RpcFuture
            return new ErrorRpcFuture(ex, locker);
        }
        // 添加请求对应的 RpcLocker 监听,当有响应时立即回调
        f.addListener(locker);
        // 封装接口 future 返回
        return new RpcFutureImpl(locker);
    }

    private RpcFuture tryLocalHandler(Req req, RpcLocker locker, RpcRoute route) {
        // 本地调用就不需要 zk 上的接口信息了,所以 route 没有使用
        // 获取之前基于 asm 代理类再封装的类,每个 api 接口对应一个 RpcActionNode
        RpcActionNode node = RpcActions.getActionNode(api);
        // 当获取不到 api 对应 RpcActionNode 时,说明该接口的初始化不在这个服务内,这时只能进行远程调用
        if (node == null) {
            return null;
        }

        JsonElement json = RpcGson.getGson().toJsonTree(req);
        Request request = RpcGson.getGson().fromJson(json, Request.class);

        request.setJsonedParam(req.getJsonedParam());
        request.setParamArray(req.getParamArray());

        // 以下是透传功能,不区分远程调用还是本地调用
        // ActionContext 容器由 ThreadLocal 存储,信息存储在每一个请求线程中,
        // 当当前请求线程需要再次发起新请求时,为了保存上一次请求的参数信息,所以先进行 ActionContext 克隆
        ActionContext context = ActionContext.get().clone();
        try {
            // 基于当前请求创建新的请求透传参数信息
            ActionContext.rpcContext(request, context.isTest());
            locker.url(LOCAL);
            // 本地调用,上文有解释过
            Response resp = LocalRequestHandler.inst.handler(request, node);
            // 恢复上一次请求参数信息
            ActionContext.recover(context);
            // 唤醒之前堵塞线程
            locker.wakeup(new RpcResult(resp.json(), resp.exception(), request.getSn()));
        } finally {
            // 恢复上一次请求参数信息
            ActionContext.recover(context);
        }
        // 封装接口 future 返回
        return new RpcFutureImpl(locker);
    }
}

接下来先来看看 RpcLocker 类,该类提供了类似锁的功能,可以阻塞当前请求线程(类似管程模型,在锁内调用阻塞后将线程移入阻塞队列) 在 Client 类中我们提到,可以对 Mina 中 session 返回的 future 添加监听f.addListener(locker);,其中IoFuture addListener(IoFutureListener<?> listener)方法用于添加一个监听器,在异步执行的结果返回时监听器中的回调方法 operationComplete(IoFuture future)。 而 RpcLocker 类就继承自 IoFutureListener,实现了回调方法 operationComplete。

public final class RpcLocker implements IoFutureListener<WriteFuture> {

    // 使用原子类更新响应结果
    private final AtomicReference<RpcResult> result = new AtomicReference<>();

    final Req req;
    private Host url;
    final Consumer<RpcResult> callback;

    // 使用原子类更新等待线程
    private final AtomicReference<Thread> awaitThread = new AtomicReference<>();

    RpcLocker(Req req, Consumer<RpcResult> callback) {
        this.req = req;
        this.callback = callback;
    }

    public Host url() {
        return url;
    }

    // 判断是否阻塞,如果已经有结果了就不阻塞
    public boolean isWaked() {
        return this.result.get() != null;
    }

    // 使用 rpc 返回结果唤醒阻塞线程
    public void wakeup(RpcResult result) {
        long receiveTime = System.currentTimeMillis();
        if (this.isWaked()) {
            return;
        }
        // 更新该请求的响应结果
        if (!this.result.compareAndSet(null, result)) {
            return;
        }
        // 更新等待信息
        Thread thread = awaitThread.getAndSet(null);
        if (thread != null) {
            // 将阻塞线程唤醒
            LockSupport.unpark(thread);
        }
        // 如果有设置回调,响应接口设置后进行回调
        if (this.callback != null) {
            callback.accept(result);
        }
    }

    //异步执行的结果返回时监听器的回调方法
    @Override
    public void operationComplete(final WriteFuture future) {
        if (future.getException() == null) {
            return;
        }
        SoaExcutors.getClientThreadPool().execute(() -> {
            // 移除请求对应锁对象,如果已经被移除过,说明该请求已经超出最大等待时间并进行了错误的处理,直接返回放弃对该结果的处理
            if (LockHolder.remove(req.getSn()) == null) {
                return;
            }
            if (url != null) {
                // 如果 url 不为空,将服务提供方加入 down 机集合内
                HostChecker.get().addDownUrl(url);
            }
            // 封装响应接口或异常信息然后进行唤醒处理
            wakeup(RpcResult.sendFailed(req, future.getException()));
        });
    }

    // Rpc 类获取 RpcFuture 并调用 execute 方法后,间接调用该方法
    public RpcResult awaitForResponse() {
        // 获取当前请求结果
        RpcResult rpcResult = this.result.get();
        Thread currentThread = Thread.currentThread();
        // 如果结果为空,则阻塞该调用线程,
        // while 为另一个编程范式,为保证线程不会莫名其妙被唤醒,从而导致程序逻辑出错,所以需要循环判断,等待真正的唤醒
        while (result.get() == null) {
            LockSupport.parkUntil(System.currentTimeMillis() + 10000);
        }

        // 获取相应结果
        rpcResult = this.result.get();
        // 如果结果为空,说明该请求已经达到了最大允许时间,已经进行了错误处理,并由 ErrorRpcFuture 进行的唤醒。
        if (rpcResult == null) {
            // 封装超时的 RpcResult
            rpcResult = RpcResult.timeout(req);
        }
        return rpcResult;
    }
}

接下来看看正常响应的 future,错误的响应类似

public class RpcFutureImpl extends AbstractRpcFuture {
    private RpcLocker locker;

    private RpcResult result;

    // Client 间接调用类,触发调用线程阻塞等待
    public RpcResult awaitForRpcResult() {
        if (this.result != null) {
            return this.result;
        }
        this.result = locker.awaitForResponse();
        return this.result;
    }

    public RpcResult rpcResult() {
        return this.result;
    }
}

上文提到如果每个请求都对应一把锁 RpcLocker,而所有锁都缓存在持有锁的 LockHolder 中。LockHolder 中有一个存放请求的 DelayQueue 队列,与每个请求的 RpcLocker 关联。LockHolder 引用一个 Sumk 统一线程池,通过统一线程池的线程调度 LockHolder 关于请求超时处理的线程任务。

public final class LockHolder {
    private static final ConcurrentHashMap<String, RpcLocker> locks = new ConcurrentHashMap<>();

    // 由 sumk 统一线程池中的线程调度
    static final LockTimeoutMonitor monitor = new LockTimeoutMonitor();
    static {
        // 将任务丢入统一线程池中设置延迟执行时间和执行间隔时间,默认核心线程数为 2
        SumkThreadPool.scheduledExecutor().scheduleWithFixedDelay(monitor, 1000, 500, TimeUnit.MILLISECONDS);
    }

    static void register(RpcLocker r, long endTime) {
        Req req = r.req;
        monitor.add(req.getSn(), endTime);
    }

    // 某个请求获得相应后移除缓存中对应的锁,并唤醒相应的阻塞线程
    static void unLockAndSetResult(Response resp) {
        RpcLocker r = locks.remove(resp.sn());
        RpcResult result = new RpcResult(resp.json(), resp.exception());
        r.wakeup(result);
    }

    private static class LockTimeoutMonitor implements Runnable {
        // 存储待处理的请求任务队列
        private static final DelayQueue<DelayedObject> QUEUE = new DelayQueue<>();

        @Override
        public void run() {
            DelayedObject delay;
            // 从队列中获取延迟对象,当队列中有元素可以取出时,说明该元素对应的请求已经超过了最大允许处理时间,进行超时处理
            while ((delay = QUEUE.poll()) != null) {
                // 获取缓存的锁对象
                RpcLocker locker = LockHolder.remove(delay.sn);
                // 如果锁对象不为空,说明请求还未相应
                if (locker != null) {
                    // 进行超时的错误处理,并唤醒阻塞线程
                    locker.wakeup(RpcResult.timeout(locker.req));
                }
            }
        }

        void add(String reqId, long endTime) {
            QUEUE.add(new DelayedObject(reqId, endTime));
        }
    }

    private static class DelayedObject implements Delayed {

        private long endTime;
        final String sn;

        public DelayedObject(String sn, long endTime) {
            this.endTime = endTime;
            this.sn = sn;
        }

        // 转换剩余时间
        public long getDelay(TimeUnit unit) {
            return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        // 为队列中的元素排序,剩余时间少的先优先
        public int compareTo(Delayed other) {
            long d = other instanceof DelayedObject ? this.endTime - DelayedObject.class.cast(other).endTime
                    : this.getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS);
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
    }
}

以上就是 Sumk 中 rpc 模块的大致部分。主要的模块有:Client、Server、Zookeeper、Mina 等。

  • Client 端和 Server 端基于 Mina 编写,通过 Mina 实现 Server 端请求监听和 Client 端响应监听。并自定义编解码规则,协议序列化规则。
  • 通过 zk 实现调用双方的接口信息传输,并对接口信息做实时监听。支持 zk 和相同接口的服务调用方集群。Client 端初始化动作就行将所有暴露该接口的 Server 封装到 Routes 中,供 Client 端使用。
  • 通过 map 和 threadlocal 组合 + clone 实现接口透传。
  • 自定义 RpcFture 实现调用线程的阻塞唤醒。
  • 通过延迟队列对超时请求做错误处理。
  • 通过 ASM 做 Server 端接口初始化。通过 ASM 代理类做接口反射调用(json、表单等方式),参数校验,限流校验等操作。
  • 使用了访问者、责任链、单例、代理等设计模式,预留的扩展点多。
  • Client 对 Server 方进行实时监听。

参考


 上一篇
Mybatis与tk插件 Mybatis与tk插件
项目整合 这周项目的存储层进行了一些改造,支持读写分离、整合 Sumk 实现数据库准实时缓存、orm 的操作更加灵活(某些 Mybatis 无法支持人大金仓的查询 Sumk 的 DB 可以直接支持)等。在部门的架构师改造后进行了一次技术分
2020-03-05
下一篇 
Sumk之分布式ID生成器 Sumk之分布式ID生成器
ID 生成器应用 公司的项目正慢慢的朝着分布式方向的改造。其中经常需要使用的小功能,就是 ID 生成。之前的做法比较简单,通过 UUID 来实现,在改造后,部分子模块开始使用长整型来代替 UUID 字符串。 为什么改成长整型能提升性能呢?
2020-02-22
  目录