注册中心提供功能有服务注册、服务发现、服务订阅、服务通知等。在 2.7.8 版本中注册中心还存储了元数据,用于减轻服务变更时下发到订阅者的压力。

1. RegistryFactory

注册中心的工厂接口,根据 URL 的参数动态选择对应工厂(url.protocol),默认是 Dubbo。

@SPI("dubbo")
public interface RegistryFactory {

    @Adaptive({"protocol"})
    Registry getRegistry(URL url);
}

1.1 AbstractRegistryFactory

实现了 RegistryFactory 接口,进行 Registry 对象的管理。

比较重要的几个方法:

1.1.1 createRegistry

protected abstract Registry createRegistry(URL url);

创建 Registry 由不同子类实现,如:ZookeeperRegistryFactory、NacosRegistryFactory

1.1.2 getRegistry

 @Override
public Registry getRegistry(URL url) {
    if (destroyed.get()) {
        LOGGER.warn("All registry instances have been destroyed, failed to fetch any instance. " +
                    "Usually, this means no need to try to do unnecessary redundant resource clearance, all registries has been taken care of.");
        return DEFAULT_NOP_REGISTRY;
    }

    url = URLBuilder.from(url)
        .setPath(RegistryService.class.getName())
        .addParameter(INTERFACE_KEY, RegistryService.class.getName())
        .removeParameters(EXPORT_KEY, REFER_KEY)
        .build();
    String key = createRegistryCacheKey(url);
    // Lock the registry access process to ensure a single instance of the registry
    LOCK.lock();
    try {
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //create registry by spi/ioc
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}
  1. 生成注册对象 Registry 的 URL。
  2. 进行注册中心对象缓存,如果未创建则加 JVM 锁后创建。

1.1.3 destroyAll

通过 JVM 已经注册的钩子回调进行注册中心实例销毁

public static void destroyAll() {
    // 忽略,若已经销毁
    if (!destroyed.compareAndSet(false, true)) {
        return;
    }

    if (LOGGER.isInfoEnabled()) {
        LOGGER.info("Close all registries " + getRegistries());
    }
    // 等到服务消费,接收到注册中心通知到该服务提供者已经下线,加大了在不重试情况下优雅停机的成功率。
    // Lock up the registry shutdown process
    LOCK.lock();
    try {
        for (Registry registry : getRegistries()) {
            try {
                registry.destroy();
            } catch (Throwable e) {
                LOGGER.error(e.getMessage(), e);
            }
        }
        REGISTRIES.clear();
    } finally {
        // Release the lock
        LOCK.unlock();
    }
}

2 RegistryService

订阅了服务注册、服务取消注册、服务订阅、服务取消订阅和服务路由几个接口方法。

public interface RegistryService {

    void register(URL url);

    void unregister(URL url);

    void subscribe(URL url, NotifyListener listener);

    void unsubscribe(URL url, NotifyListener listener);

    List<URL> lookup(URL url);
}

2.1 Registry

Registry 继承了 Node, RegistryService 接口,定义了注册和取消注册重试接口,并拥有了 Node 接口的几个方法:isAvailable、getUrl、destroy

public interface Registry extends Node, RegistryService {
    default void reExportRegister(URL url) {
        register(url);
    }

    default void reExportUnregister(URL url) {
        unregister(url);
    }
}

2.2 AbstractRegistry

实现 Registry 接口,实现通用的注册、订阅、查询、通知方法,并将数据持久化冗余到文件来处理注册中心 down 后的高可用。

AbstractRegistry 相关属性和构造器:

/**
 *  本地磁盘缓存。记录注册中心列表和服务提供者列表
 */
 private final Properties properties = new Properties();
/**
 * 本地磁盘缓存执行器
*/
private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
/**
     * 是否同步保存文件
     */
private boolean syncSaveFile;
/**
     * 已注册 URL 集合 (服务提供者/服务消费者)。
     */
private final Set<URL> registered = new ConcurrentHashSet<>();
/**
     * 订阅 URL 的监听器集合
     */
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<>();

/**
* 被通知的 URL 集合
    key:消费者 URL
    value:
        key: 节点分类(providers、consumers、routes、configurators)
        value:分类下的 URL 集合
**/
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<>();

public AbstractRegistry(URL url) {
    setUrl(url);
    if (url.getParameter(REGISTRY__LOCAL_FILE_CACHE_ENABLED, true)) {
        // Start file save timer
        syncSaveFile = url.getParameter(REGISTRY_FILESAVE_SYNC_KEY, false);
        String defaultFilename = System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(APPLICATION_KEY) + "-" + url.getAddress().replaceAll(":", "-") + ".cache";
        String filename = url.getParameter(FILE_KEY, defaultFilename);
        File file = null;
        if (ConfigUtils.isNotEmpty(filename)) {
            file = new File(filename);
            if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
                if (!file.getParentFile().mkdirs()) {
                    throw new IllegalArgumentException("Invalid registry cache file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
                }
            }
        }
        this.file = file;
        // When starting the subscription center,
        // we need to read the local cache file for future Registry fault tolerance processing.
        // 加载本地磁盘缓存文件到内存缓存
        loadProperties();
        // 通知监听器,URL 变化结果
        notify(url.getBackupUrls());
    }
}

1. 构造函数中判断是否需要进行文件缓存,默认为 true。
2. 如果需要缓存则创建文件并将文件加载到缓存中。
3. 通知监听器

2.3 notify

在构造器中将 url 传入 notify 方法,如果当前有订阅的消费者并且 URL 和传入的 URL 匹配,则使用消费者订阅时注册的 Listener 进行后续处理。

protected void notify(List<URL> urls) {
    if (CollectionUtils.isEmpty(urls)) {
        return;
    }

    for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) {
        URL url = entry.getKey();

        if (!UrlUtils.isMatch(url, urls.get(0))) {
            continue;
        }

        Set<NotifyListener> listeners = entry.getValue();
        if (listeners != null) {
            for (NotifyListener listener : listeners) {
                try {
                    notify(url, listener, filterEmpty(url, urls));
                } catch (Throwable t) {
                    logger.error("Failed to notify registry event, urls: " + urls + ", cause: " + t.getMessage(), t);
                }
            }
        }
    }
}
  1. isMatch 方法主要用于比较两个 URL 是否相同,通过 group、version、classifier 三个参数进行比较。
  2. 匹配后进行重载方法处理

2.4 notify(URL url, NotifyListener listener, List urls)

  /**
     * Notify changes from the Provider side.
     * 数据流向 urls -> notified -> properties -> file
     *
     * @param url      consumer side url
     * @param listener listener
     * @param urls     provider latest urls
     */
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
        && !ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    // 将 `urls` 按照 `url.parameter.category` 分类,添加到集合
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    // 获得消费者 URL 对应的在 `notified` 中,通知的 URL 变化结果(全量数据)
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    // 处理通知的 URL 变化结果(全量数据)
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        // 覆盖到 `notified`
        // 当某个分类的数据为空时,会依然有 urls 。其中 `urls[0].protocol = empty` ,通过这样的方式,处理所有服务提供者为空的情况。
        categoryNotified.put(category, categoryList);
        // 通知监听器
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        // 保存到文件
        saveProperties(url);
    }
}

1. 如果 provider 的最新 url list 为空,则直接返回。
2. 获取消费者 url 之前保存的最新 url list,按照 category 区分。
3. 如果某个分类数据为空,则放入 empty 的 protocol,用于处理服务提供者为空情况(下文的 toUrlsWithEmpty)。
4. 用最新的 urls 覆盖旧的 categoryNotified,保证 notified 记录的消费者对应的提供者列表都是最新的。
5. 通知监听器。
6. 缓存 consumer 的 url,防止注册中心由于网络原因 down 了。

2.5 RegistryDirectory.notify

接着 2.4,看一下收到最新的 url 后如何进行旧 Invoker 的 refresh 的。

public synchronized void notify(List<URL> urls) {
    // 根据 URL 的分类或协议,分组成三个集合
    Map<String, List<URL>> categoryUrls = urls.stream()
        .filter(Objects::nonNull)
        .filter(this::isValidCategory)
        .filter(this::isNotCompatibleFor26x)
        .collect(Collectors.groupingBy(this::judgeCategory));

    // 处理配置规则 URL 集合
    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

    // 处理路由规则 URL 集合
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // providers
    // 合并配置规则,到 `directoryUrl` 中,形成 `overrideDirectoryUrl` 变量
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
         * 3.x added for extend URL address
         */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}
  1. 将 URL 按分类分成三个集合(configurators、routers、providers)。
  2. 处理配置 URL 集合,配置覆盖到 this.configurators 中。
  3. 处理路由规则 URL 集合(后续文章说明 TagRouter 在灰度环境的使用)。
  4. 处理提供者 URL 集合,调用 refreshOverrideAndInvoker 重写配置和重新 consumer 中对应的 Invoker(provider)。

2.6 RegistryDirectory.refreshInvoker

// 处理服务提供者 URL 集合
private void refreshInvoker(List<URL> invokerUrls) {
    Assert.notNull(invokerUrls, "invokerUrls should not be null");

    if (invokerUrls.size() == 1
        && invokerUrls.get(0) != null
        && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        // 设置禁止访问
        this.forbidden = true; // Forbid to access
        // methodInvokerMap 置空
        this.invokers = Collections.emptyList();
        routerChain.setInvokers(this.invokers);
        // 销毁所有 Invoker 集合
        destroyAllInvokers(); // Close all invokers
    } else {
        // 设置允许访问
        this.forbidden = false; // Allow to access
        // 引用老的 urlInvokerMap
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        // 传入的 invokerUrls 为空,说明是路由规则或配置规则发生改变,此时 invokerUrls 是空的,直接使用 cachedInvokerUrls 。
        if (invokerUrls == Collections.<URL>emptyList()) {
            invokerUrls = new ArrayList<>();
        }
        if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
            // 传入的 invokerUrls 非空,更新 cachedInvokerUrls 。
        } else {
            this.cachedInvokerUrls = new HashSet<>();
            this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
        }
        // 忽略,若无 invokerUrls
        if (invokerUrls.isEmpty()) {
            return;
        }
        // 将传入的 invokerUrls ,转成新的 urlInvokerMap
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

        /**
             * If the calculation is wrong, it is not processed.
             *
             * 1. The protocol configured by the client is inconsistent with the protocol of the server.
             *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
             * 2. The registration center is not robust and pushes illegal specification data.
             *
             */
        if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                                                   .toString()));
            return;
        }

        // 转换出新的 methodInvokerMap
        List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
        // pre-route and build cache, notice that route cache should build on original Invoker list.
        // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
        routerChain.setInvokers(newInvokers);
        // 若服务引用多 group ,则按照 method + group 聚合 Invoker 集合
        this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
        this.urlInvokerMap = newUrlInvokerMap;
        // 销毁不再使用的 Invoker 集合
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}
  1. 如果 invokerUrls size 为 1 但是为 empty 协议,则销毁当前所有 Invoker,并设置为禁止访问。
  2. 如果 invokerUrls 为空,说明配置发生变更,引用缓存的 cachedInvokerUrls。
  3. 通过 toInvokers 转换新的 urlInvokerMap。
  4. 若服务引用了多 group,则使用 method + group 聚合 Invoker。
  5. 销毁不再使用的 Invoker 集合。

2.6 RegistryDirectory.toInvokers

通过 toInvokers 方法转换新的 invoker 集合,如果转换后的 Invoker 已经被引用过则不再重新引用。

private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
    // 新的 `newUrlInvokerMap
    Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
    // 若为空,直接返回
    if (urls == null || urls.isEmpty()) {
        return newUrlInvokerMap;
    }
    // 已初始化的服务器提供 URL 集合
    Set<String> keys = new HashSet<>();
    // 获得引用服务的协议
    String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
    // 循环服务提供者 URL 集合,转成 Invoker 集合
    for (URL providerUrl : urls) {
        // If protocol is configured at the reference side, only the matching protocol is selected
        // 如果 reference 端配置了 protocol ,则只选择匹配的 protocol
        if (queryProtocols != null && queryProtocols.length() > 0) {
            boolean accept = false;
            String[] acceptProtocols = queryProtocols.split(",");// 可配置多个协议
            for (String acceptProtocol : acceptProtocols) {
                if (providerUrl.getProtocol().equals(acceptProtocol)) {
                    accept = true;
                    break;
                }
            }
            if (!accept) {
                continue;
            }
        }
        // 忽略,若为 `empty://` 协议
        if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
            continue;
        }
        // 忽略,若应用程序不支持该协议
        if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
            logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                                                   " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                                                   " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                                                   ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
            continue;
        }
        // 合并 URL 参数
        URL url = mergeUrl(providerUrl);

        // 忽略,若已经初始化
        String key = url.toFullString(); // The parameter urls are sorted
        if (keys.contains(key)) { // Repeated url
            continue;
        }
        // 添加到 `keys` 中
        keys.add(key);
        // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
        Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
        // 已经引用过的不再重复引用
        if (invoker == null) { // Not in the cache, refer again
            try {
                boolean enabled = true;
                if (url.hasParameter(DISABLED_KEY)) {
                    enabled = !url.getParameter(DISABLED_KEY, false);
                } else {
                    enabled = url.getParameter(ENABLED_KEY, true);
                }
                // 若开启,创建 Invoker 对象
                if (enabled) {
                    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                }
            } catch (Throwable t) {
                logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
            }
            // 添加到 newUrlInvokerMap 中
            if (invoker != null) { // Put new invoker in cache
                newUrlInvokerMap.put(key, invoker);
            }
        } else {// 在缓存中,直接使用缓存的 Invoker 对象,添加到 newUrlInvokerMap 中
            newUrlInvokerMap.put(key, invoker);
        }
    }
    keys.clear();
    return newUrlInvokerMap;
}

1. 通过 queryMap.get(PROTOCOL_KEY) 进行协议匹配,如果匹配不上打印 error 不抛异常。
2. 通过 mergeUrl 方法覆盖消费者的配置到 provider,消费者配置就是 RegistryDirectory 中的 queryMap 属性。
3. 将没有进行服务引用的 URL 进行服务引用,并添加到 urlInvokerMap 中,key 为 URL 的格式化打印。
4. 如果之前没引用过,但是设置为禁用的的服务则直接跳过。
5. 通过 protocol 的 refer 进行服务引用,并将返回的 Invoker 进行 Wrapper 成 InvokerDelegate。

2.7 RegistryDirectory.toMergeInvokerList

如果当前 consumer 存在多个 group 分组,则按照 group 进行聚合。

private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
        // 创建新的 `methodInvokerMap`
    List<Invoker<T>> mergedInvokers = new ArrayList<>();
    Map<String, List<Invoker<T>>> groupMap = new HashMap<>();
    // 创建 Invoker 集合
    for (Invoker<T> invoker : invokers) {
        String group = invoker.getUrl().getParameter(GROUP_KEY, "");
        groupMap.computeIfAbsent(group, k -> new ArrayList<>());
        groupMap.get(group).add(invoker);
    }

    if (groupMap.size() == 1) {
        mergedInvokers.addAll(groupMap.values().iterator().next());
    } else if (groupMap.size() > 1) {
        for (List<Invoker<T>> groupList : groupMap.values()) {
            StaticDirectory<T> staticDirectory = new StaticDirectory<>(groupList);
            staticDirectory.buildRouterChain();
            mergedInvokers.add(CLUSTER.join(staticDirectory));
        }
    } else {
        mergedInvokers = invokers;
    }
    return mergedInvokers;
}
  1. 创建新的 methodInvokerMap 集合,并按照 group 进行分组
  2. 创建静态 StaticDirectory,并为其构建 Router(通过 url 中的 protocol)
  3. 创建对应的 Cluster。

2.8 AbstractRegistry.register

AbstractRegistry 的服务注册只是将 URL 记到内存中,做真正 URL 注册的是其子类。

public void register(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("register url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Register: " + url);
    }
    registered.add(url);
}

2.9 AbstractRegistry.unregister

AbstractRegistry 的服务取消注册只是将 URL 从内存中去掉,真正做移除的是其子类。

public void unregister(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("unregister url == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Unregister: " + url);
    }
    registered.remove(url);
}

2.10 AbstractRegistry.subscribe AbstractRegistry.unsubscribe

AbstractRegistry 的服务订阅与取消负责将订阅的 consumer URL 和对应的监听器加入到内存的 subscribed 变量中和从内存中移除对应的变量,如下:

public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
    listeners.add(listener);
}
unsubscribe....

3 FailbackRegistry

FailbackRegistry 实现 AbstractRegistry 抽象类,支持失败重试的 Registry 抽象类。FailbackRegistry 在 AbstractRegistry 的基础上增加了真正执行注册中心操作,并提供相应的失败重试。

/**
     * 失败发起注册失败的 URL 集合
     */
private final ConcurrentMap<URL, FailedRegisteredTask> failedRegistered = new ConcurrentHashMap<URL, FailedRegisteredTask>();

/**
     * 失败取消注册失败的 URL 集合
     */
private final ConcurrentMap<URL, FailedUnregisteredTask> failedUnregistered = new ConcurrentHashMap<URL, FailedUnregisteredTask>();

/**
     * 失败发起订阅失败的监听器集合
     */
private final ConcurrentMap<Holder, FailedSubscribedTask> failedSubscribed = new ConcurrentHashMap<Holder, FailedSubscribedTask>();

/**
     * 失败取消订阅失败的监听器集合
     */
private final ConcurrentMap<Holder, FailedUnsubscribedTask> failedUnsubscribed = new ConcurrentHashMap<Holder, FailedUnsubscribedTask>();

/**
     * 失败通知通知的 URL 集合
     */
private final ConcurrentMap<Holder, FailedNotifiedTask> failedNotified = new ConcurrentHashMap<Holder, FailedNotifiedTask>();

/**
     * The time in milliseconds the retryExecutor will wait
     */
private final int retryPeriod;

// Timer for failure retry, regular check if there is a request for failure, and if there is, an unlimited retry
private final HashedWheelTimer retryTimer;

public FailbackRegistry(URL url) {
    super(url);
    // 重试频率,单位:毫秒
    this.retryPeriod = url.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD);

    // since the retry task will not be very much. 128 ticks is enough.
    // 创建失败重试定时器
    retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
}
  1. 重试的逻辑也比较简单,将所有注册相关需要重试的任务都封装成 AbstractRetryTask。
  2. 将封装好的 AbstractRetryTask 进一步划分各种实现。然后交给同一个定时器执行,默认 5s 执行重试一次。

3.1 FailbackRegistry.register

FailbackRegistry 的 register、notify、unregister、subscribe、unsubscribe 这几个方法都比较类似。可以参考一个,如:register

public void register(URL url) {
    if (!acceptable(url)) {
        logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
        return;
    }
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedRegistered(url);
    }
}
  1. 进行协议校验,否则该 consumer/provider url 无法被注册。
  2. 调用 AbstractRegister 父类方法进行内存标记。
  3. 移除曾经注册失败的 url,这样,当时间轮定时器下一次扫描时就会将这些任务剔除掉。
  4. doRegister 调用真正的注册中心进行注册,注意:调用实际注册中心的方法都是以 do 前缀。
  5. 如果出现异常则将任务包装测重试 Task 放入定时器中,并在内存中标记。

4 NotifyListener

当收到服务变更通知时回调的监听器,大部分逻辑都在 2 节提到过。

public interface NotifyListener {

    void notify(List<URL> urls);
}

5 Zookeeper 注册中心服务模型

先提下 Zookeeper 注册的服务模型。Zookeeper 模型有 4 级。从上到下分别为 ROOT、SERVICE、TYPE、URL。

流程说明:
服务提供者启动时: 向 /dubbo/com.foo.BarService/providers 目录下写入自己的 URL 地址
服务消费者启动时: 订阅 /dubbo/com.foo.BarService/providers 目录下的提供者 URL 地址。并向 /dubbo/com.foo.BarService/consumers 目录下写入自己的 URL 地址
监控中心启动时: 订阅 /dubbo/com.foo.BarService 目录下的所有提供者和消费者 URL 地址。

在图中,我们可以看到 Zookeeper 的节点层级,自上而下是:

  • Root 层:根目录,可通过 的 “group” 设置 Zookeeper 的根节点,缺省使用 “dubbo” 。
  • Service 层:服务接口全名。
  • Type 层:分类。目前除了我们在图中看到的 “providers”( 服务提供者列表 ) “consumers”( 服务消费者列表 ) 外,还有 “routes”( 路由规则列表 ) 和 “configurations”( 配置规则列表 )。
  • URL 层:URL ,根据不同 Type 目录,下面可以是服务提供者 URL 、服务消费者 URL 、路由规则 URL 、配置规则 URL 。

服务消费者启动后,不仅仅订阅了 “providers” 分类,也订阅了 “routes” “configurations” 分类。

5 ZookeeperRegistryFactory

ZookeeperRegistryFactory 实现了 1 节说的 AbstractRegistryFactory。提供了 Zookeeper 方式的实现。

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    /**
     * Invisible injection of zookeeper client via IOC/SPI
     * @param zookeeperTransporter
     */
    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}
  1. zk 是 AbstractRegistryFactory 的其中一种实现,可通过 spi 机制进行加载。
  2. spi 机制加载对应实现 createRegistry。

6 ZookeeperRegistry

ZookeeperRegistry 实现 FailbackRegistry 抽象类。

/**
     * 默认 Zookeeper 根节点
     */
private final static String DEFAULT_ROOT = "dubbo";

/**
     * Zookeeper 根节点
     */
private final String root;

/**
     * Service 接口全名集合
     */
private final Set<String> anyServices = new ConcurrentHashSet<>();

/**
     * 监听器集合
     */
private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<>();

/**
     * Zookeeper 客户端
     */
private final ZookeeperClient zkClient;

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 获得 Zookeeper 根节点
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    // 创建 Zookeeper Client
    zkClient = zookeeperTransporter.connect(url);
    // 添加 StateListener 对象。该监听器,在重连时,调用恢复方法。
    zkClient.addStateListener((state) -> {
        if (state == StateListener.RECONNECTED) {
            logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
                        " Since ephemeral ZNode will not get deleted for a connection lose, " +
                        "there's no need to re-register url of this instance.");
            ZookeeperRegistry.this.fetchLatestAddresses();
        } else if (state == StateListener.NEW_SESSION_CREATED) {
            logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
            try {
                ZookeeperRegistry.this.recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        } else if (state == StateListener.SESSION_LOST) {
            logger.warn("Url of this instance will be deleted from registry soon. " +
                        "Dubbo client will try to re-register once a new session is created.");
        } else if (state == StateListener.SUSPENDED) {

        } else if (state == StateListener.CONNECTED) {

        }
    });
}

1. zkListeners 属性为监听器集合,key 是 consumer 的 URL,value 中 key 为 subscribe 时携带的 listener 用于监听回调操作(如:刷新 Invoker),value 中的 value(ChildListener) 为 zk 中的子节点变化回调。所以 NotifyListener 和 ChildListener 为一对一关系。
2. 构造 ZookeeperRegistry 后,需要设置 Zookeeper Client、Zookeeper 根节点等一些初始化属性。
3. 通过 addStateListener 方法增加状态监听器,用于断开重连时能调用 recover 方法进行重新注册和订阅。

6.1 ZookeeperRegistry.doRegister

调用 zkClient 进行节点注册:

@Override
public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

通过 toUrlPath 方法返回完整路径 (Root + Service + Type + URL)

private String toUrlPath(URL url) {
    return toCategoryPath(url) + PATH_SEPARATOR + URL.encode(url.toFullString());
}

// root + service + type
private String toCategoryPath(URL url) {
    return toServicePath(url) + PATH_SEPARATOR + url.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
}

//  root + service
private String toServicePath(URL url) {
    String name = url.getServiceInterface();
    if (ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}

6.2 ZookeeperRegistry.doUnregister

doUnregister 和 doRegister 方法类似

6.3 ZookeeperRegistry.doSubscribe

通过 doSubscribe 方法进行真正的节点订阅。

public void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 处理所有 Service 层的发起订阅,例如监控中心的订阅
        if (ANY_VALUE.equals(url.getServiceInterface())) {
            String root = toRootPath();
            // 获得 url 对应的监听器集合
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
            // 获得 ChildListener 对象
            ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                for (String child : currentChilds) {
                    child = URL.decode(child);
                    // 新增 Service 接口全名时(即新增服务),发起该 Service 层的订阅
                    if (!anyServices.contains(child)) {
                        anyServices.add(child);
                        subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                                                   Constants.CHECK_KEY, String.valueOf(false)), k);
                    }
                }
            });
            // 创建 Service 节点。该节点为持久节点。
            zkClient.create(root, false);
            // 向 Zookeeper ,Service 节点,发起订阅
            List<String> services = zkClient.addChildListener(root, zkListener);
            // 首次全量数据获取完成时,循环 Service 接口全名数组,发起该 Service 层的订阅
            if (CollectionUtils.isNotEmpty(services)) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
            // 处理指定 Service 层的发起订阅,例如服务消费者的订阅
        } else {
            // 子节点数据数组 (Service 层下的所有 URL)
            List<URL> urls = new ArrayList<>();
            // 循环分类数组
            for (String path : toCategoriesPath(url)) {
                // 获得 url 对应的监听器集合
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                // 获得 ChildListener 对象
                ChildListener zkListener = listeners.computeIfAbsent(listener,// 不存在 ChildListener 对象,进行创建 ChildListener 对象
                                                                     // 变更时,调用 `#notify(...)` 方法,回调 NotifyListener
                                                                     k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                // 创建 Type 节点。该节点为持久节点。
                zkClient.create(path, false);
                // 向 Zookeeper ,PATH 节点,发起订阅
                List<String> children = zkClient.addChildListener(path, zkListener);
                // 添加到 `urls` 中
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 首次全量数据获取完成时,调用 `#notify(...)` 方法,回调 NotifyListener
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  1. 处理所有 Service 层的发起订阅,例如监控中心的订阅。
  2. 如果是监控中心订阅则通过 addChildListener 将订阅节点放在 root 上,在节点变更后(新增服务后)进行订阅。
  3. 如果是监控中心订阅首次需要将发起订阅监听后返回的所有 service 节点进行逐一订阅。
  4. 如果有指定 interface 则表示为消费者发起订阅。
  5. 如果是消费者订阅则需如上图,分别向四个 Type 类别分别发起订阅。
  6. 这里的 NotifyListener、 ChildListener 是没有区分类别的,所有一个 consumer url 虽然目前有 4 个 type(proviers、consumers…..) 但是对应的 NotifyListener、 ChildListener 只有有一个。
  7. 这里有个小细节,toUrlsWithEmpty 用于获取所有已注册的 url 中与 consumer url 匹配的 list,但如果已注册的 url 为空 (对某个 Type 下完整 path 发起订阅后通过返回的 children 发现该节点下没有数据),则需要添加一个 protocol 为 empty 的节点,在 refresh consumer 的 Invoker 时用于判断是否是服务下线进而将 Invoker 都删除掉。
  8. 如果是首次的订阅则需要将所有的 urls(不区分 Type) 下发给对应 listener 进行相应的回调(如:consumer 通过 Type:provider 的 url 列表创建所有用于调用的 Invoker)。

6.4 ZookeeperRegistry.doUnsubscribe

doUnsubscribe 与 doSubscribe 类似。

6.4 ZookeeperRegistry.lookup

public List<URL> lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        // 循环分类数组,获得所有的 URL 数组
        List<String> providers = new ArrayList<>();
        for (String path : toCategoriesPath(url)) {
            List<String> children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        // 匹配
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
  1. 通过 lookup 方法可拉取所有已注册且符合传入 url 条件的 URL 列表。
  2. 这里和订阅推送不同,是主动拉模式。

7 服务提供者注册

通过 RegistryProtocol 类进行服务暴露时调用注册中心 Registry 对象进行服务注册。

@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

    // 向注册中心注册服务提供者(自己)
    if (register) {
        register(registryUrl, registeredProviderUrl);
    }
}

// 向注册中心注册服务提供者(自己)
private void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}

8 服务消费者注册+订阅

这里简略描述服务消费者注册和订阅场景。

8.1 服务消费者注册

通过 RegistryProtocol 类进行服务引用时调用注册中心 Registry 对象进行服务注册。

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 向注册中心注册自己(服务消费者)
    if (directory.isShouldRegister()) {
        directory.setRegisteredConsumerUrl(subscribeUrl);
        registry.register(directory.getRegisteredConsumerUrl());
    }
    .....
}

8.2 服务消费者订阅

通过 RegistryProtocol 类进行服务引用时调用注册中心 Registry 对象进行服务订阅。通过 cluster.join 创建每个 interface 的 Invoker。

 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        // 向注册中心订阅服务提供者
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        // 创建 Invoker 对象,基于 RegistryDirectory 创建 invoker 对象
        Invoker<T> invoker = cluster.join(directory);
     ....
 }

上面是一个 interface 的订阅,在 ReferenceConfig 创建代理对象的时候,通过循环调用 RegistryProtocol 可以实现所有 interface 的订阅和相应 invoker 的创建。

private T createProxy(Map<String, String> map) {
    //....
    List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
    URL registryURL = null;
    for (URL url : urls) {
        // 引用服务
        invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
        // 使用最后一个注册中心的 URL
        if (UrlUtils.isRegistry(url)) {
            registryURL = url; // use last registry url
        }
    }
    //....
}
最后修改日期: 2021年4月11日

作者

留言

撰写回覆或留言

发布留言必须填写的电子邮件地址不会公开。