成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專(zhuān)欄INFORMATION COLUMN

dubbo源碼解析(六)注冊(cè)中心——redis

plus2047 / 2119人閱讀

摘要:延長(zhǎng)過(guò)期時(shí)間就是重新注冊(cè)一次。關(guān)閉過(guò)期執(zhí)行器關(guān)閉通知器銷(xiāo)毀連接池關(guān)閉任務(wù)調(diào)度器這是銷(xiāo)毀的方法,邏輯很清晰,方法在源碼解析四注冊(cè)中心中已經(jīng)講到。

注冊(cè)中心——redis
目標(biāo):解釋以為redis實(shí)現(xiàn)的注冊(cè)中心原理,解讀duubo-registry-redis的源碼

Redis是一個(gè)key-value存儲(chǔ)系統(tǒng),交換數(shù)據(jù)非常快,redis以內(nèi)存作為數(shù)據(jù)存儲(chǔ)的介質(zhì),所以讀寫(xiě)數(shù)據(jù)的效率極高,遠(yuǎn)遠(yuǎn)超過(guò)數(shù)據(jù)庫(kù)。redis支持豐富的數(shù)據(jù)類(lèi)型,dubbo就利用了redis的value支持map的數(shù)據(jù)類(lèi)型。redis的key為服務(wù)名稱(chēng)和服務(wù)的類(lèi)型。map中的key為URL地址,map中的value為過(guò)期時(shí)間,用于判斷臟數(shù)據(jù),臟數(shù)據(jù)由監(jiān)控中心刪除。

dubbo利用JRedis來(lái)連接到Redis分布式哈希鍵-值數(shù)據(jù)庫(kù),因?yàn)镴edis實(shí)例不是線程安全的,所以不可以多個(gè)線程共用一個(gè)Jedis實(shí)例,但是創(chuàng)建太多的實(shí)現(xiàn)也不好因?yàn)檫@意味著會(huì)建立很多sokcet連接。 所以dubbo又用了JedisPool,JedisPool是一個(gè)線程安全的網(wǎng)絡(luò)連接池??梢杂肑edisPool創(chuàng)建一些可靠Jedis實(shí)例,可以從池中獲取Jedis實(shí)例,使用完后再把Jedis實(shí)例還回JedisPool。這種方式可以避免創(chuàng)建大量socket連接并且會(huì)實(shí)現(xiàn)高效的性能。

上述稍微介紹了dubbo用redis實(shí)現(xiàn)注冊(cè)中心的依賴,接下來(lái)讓我們來(lái)看看具體的實(shí)現(xiàn)邏輯。下圖是包的結(jié)構(gòu):

包結(jié)構(gòu)非常類(lèi)似。接下來(lái)我們就來(lái)解讀一下這兩個(gè)類(lèi)。

(一)RedisRegistry

該類(lèi)繼承了FailbackRegistry類(lèi),該類(lèi)就是針對(duì)注冊(cè)中心核心的功能注冊(cè)、訂閱、取消注冊(cè)、取消訂閱,查詢注冊(cè)列表進(jìn)行展開(kāi),基于redis來(lái)實(shí)現(xiàn)。

1.屬性
// 日志記錄
private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);

// 默認(rèn)的redis連接端口
private static final int DEFAULT_REDIS_PORT = 6379;

// 默認(rèn) Redis 根節(jié)點(diǎn),涉及到的是dubbo的分組配置
private final static String DEFAULT_ROOT = "dubbo";

// 任務(wù)調(diào)度器
private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

//Redis Key 過(guò)期機(jī)制執(zhí)行器
private final ScheduledFuture expireFuture;

// Redis 根節(jié)點(diǎn)
private final String root;

// JedisPool集合,map 的key為 "ip:port"的形式
private final Map jedisPools = new ConcurrentHashMap();

// 通知器集合,key為 Root + Service的形式
// 例如 /dubbo/com.alibaba.dubbo.demo.DemoService
private final ConcurrentMap notifiers = new ConcurrentHashMap();

// 重連時(shí)間間隔,單位:ms
private final int reconnectPeriod;

// 過(guò)期周期,單位:ms
private final int expirePeriod;

// 是否通過(guò)監(jiān)控中心,用于判斷臟數(shù)據(jù),臟數(shù)據(jù)由監(jiān)控中心刪除
private volatile boolean admin = false;

// 是否復(fù)制模式
private boolean replicate;

可以從屬性中看到基于redis的注冊(cè)中心可以被監(jiān)控中心監(jiān)控,并且對(duì)過(guò)期的節(jié)點(diǎn)有清理的機(jī)制。

2.構(gòu)造方法
public RedisRegistry(URL url) {
    super(url);
    // 判斷地址是否為空
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 實(shí)例化對(duì)象池
    GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    // 如果 testOnBorrow 被設(shè)置,pool 會(huì)在 borrowObject 返回對(duì)象之前使用 PoolableObjectFactory的 validateObject 來(lái)驗(yàn)證這個(gè)對(duì)象是否有效
    // 要是對(duì)象沒(méi)通過(guò)驗(yàn)證,這個(gè)對(duì)象會(huì)被丟棄,然后重新選擇一個(gè)新的對(duì)象。
    config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
    // 如果 testOnReturn 被設(shè)置, pool 會(huì)在 returnObject 的時(shí)候通過(guò) PoolableObjectFactory 的validateObject 方法驗(yàn)證對(duì)象
    // 如果對(duì)象沒(méi)通過(guò)驗(yàn)證,對(duì)象會(huì)被丟棄,不會(huì)被放到池中。
    config.setTestOnReturn(url.getParameter("test.on.return", false));
    // 指定空閑對(duì)象是否應(yīng)該使用 PoolableObjectFactory 的 validateObject 校驗(yàn),如果校驗(yàn)失敗,這個(gè)對(duì)象會(huì)從對(duì)象池中被清除。
    // 這個(gè)設(shè)置僅在 timeBetweenEvictionRunsMillis 被設(shè)置成正值( >0) 的時(shí)候才會(huì)生效。
    config.setTestWhileIdle(url.getParameter("test.while.idle", false));
    if (url.getParameter("max.idle", 0) > 0)
        // 控制一個(gè)pool最多有多少個(gè)狀態(tài)為空閑的jedis實(shí)例。
        config.setMaxIdle(url.getParameter("max.idle", 0));
    if (url.getParameter("min.idle", 0) > 0)
        // 控制一個(gè)pool最少有多少個(gè)狀態(tài)為空閑的jedis實(shí)例。
        config.setMinIdle(url.getParameter("min.idle", 0));
    if (url.getParameter("max.active", 0) > 0)
        // 控制一個(gè)pool最多有多少個(gè)jedis實(shí)例。
        config.setMaxTotal(url.getParameter("max.active", 0));
    if (url.getParameter("max.total", 0) > 0)
        config.setMaxTotal(url.getParameter("max.total", 0));
    if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0)
        //表示當(dāng)引入一個(gè)jedis實(shí)例時(shí),最大的等待時(shí)間,如果超過(guò)等待時(shí)間,則直接拋出JedisConnectionException;
        config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));
    if (url.getParameter("num.tests.per.eviction.run", 0) > 0)
        // 設(shè)置驅(qū)逐線程每次檢測(cè)對(duì)象的數(shù)量。這個(gè)設(shè)置僅在 timeBetweenEvictionRunsMillis 被設(shè)置成正值( >0)的時(shí)候才會(huì)生效。
        config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
    if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)
        // 指定驅(qū)逐線程的休眠時(shí)間。如果這個(gè)值不是正數(shù)( >0),不會(huì)有驅(qū)逐線程運(yùn)行。
        config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
    if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)
        // 指定最小的空閑驅(qū)逐的時(shí)間間隔(空閑超過(guò)指定的時(shí)間的對(duì)象,會(huì)被清除掉)。
        // 這個(gè)設(shè)置僅在 timeBetweenEvictionRunsMillis 被設(shè)置成正值( >0)的時(shí)候才會(huì)生效。
        config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

    // 獲取url中的集群配置
    String cluster = url.getParameter("cluster", "failover");
    if (!"failover".equals(cluster) && !"replicate".equals(cluster)) {
        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");
    }
    // 設(shè)置是否為復(fù)制模式
    replicate = "replicate".equals(cluster);

    List addresses = new ArrayList();
    addresses.add(url.getAddress());
    // 備用地址
    String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);
    if (backups != null && backups.length > 0) {
        addresses.addAll(Arrays.asList(backups));
    }

    for (String address : addresses) {
        int i = address.indexOf(":");
        String host;
        int port;
        // 分割地址和端口號(hào)
        if (i > 0) {
            host = address.substring(0, i);
            port = Integer.parseInt(address.substring(i + 1));
        } else {
            // 沒(méi)有端口的設(shè)置默認(rèn)端口
            host = address;
            port = DEFAULT_REDIS_PORT;
        }
        // 創(chuàng)建連接池并加入集合
        this.jedisPools.put(address, new JedisPool(config, host, port,
                url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT), StringUtils.isEmpty(url.getPassword()) ? null : url.getPassword(),
                url.getParameter("db.index", 0)));
    }

    // 設(shè)置url攜帶的連接超時(shí)時(shí)間,如果沒(méi)有配置,則設(shè)置默認(rèn)為3s
    this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);
    // 獲取url中的分組配置,如果沒(méi)有配置,則默認(rèn)為dubbo
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    if (!group.endsWith(Constants.PATH_SEPARATOR)) {
        group = group + Constants.PATH_SEPARATOR;
    }
    // 設(shè)置redis 的根節(jié)點(diǎn)
    this.root = group;

    // 獲取過(guò)期周期配置,如果沒(méi)有,則默認(rèn)為60s
    this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);
    // 創(chuàng)建過(guò)期機(jī)制執(zhí)行器
    this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                // 延長(zhǎng)到期時(shí)間
                deferExpired(); // Extend the expiration time
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);
            }
        }
    }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);
}

構(gòu)造方法首先是調(diào)用了父類(lèi)的構(gòu)造函數(shù),然后是對(duì)對(duì)象池的一些配置進(jìn)行了初始化,具體的我已經(jīng)在注釋中寫(xiě)明。在構(gòu)造方法中還做了連接池的創(chuàng)建、過(guò)期機(jī)制執(zhí)行器的創(chuàng)建,其中過(guò)期會(huì)進(jìn)行延長(zhǎng)到期時(shí)間的操作具體是在deferExpired方法中實(shí)現(xiàn)。還有一個(gè)關(guān)注點(diǎn)事該執(zhí)行器的時(shí)間是取周期的一半。

3.deferExpired
private void deferExpired() {
    for (Map.Entry entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            // 獲得連接池中的Jedis實(shí)例
            Jedis jedis = jedisPool.getResource();
            try {
                // 遍歷已經(jīng)注冊(cè)的服務(wù)url集合
                for (URL url : new HashSet(getRegistered())) {
                    // 如果是非動(dòng)態(tài)管理模式
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                        // 獲得分類(lèi)路徑
                        String key = toCategoryPath(url);
                        // 以hash 散列表的形式存儲(chǔ)
                        if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {
                            // 發(fā)布 Redis 注冊(cè)事件
                            jedis.publish(key, Constants.REGISTER);
                        }
                    }
                }
                // 如果通過(guò)監(jiān)控中心
                if (admin) {
                    // 刪除過(guò)時(shí)的臟數(shù)據(jù)
                    clean(jedis);
                }
                // 如果服務(wù)器端已同步數(shù)據(jù),只需寫(xiě)入單臺(tái)機(jī)器
                if (!replicate) {
                    break;//  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
        }
    }
}

該方法實(shí)現(xiàn)了延長(zhǎng)到期時(shí)間的邏輯,遍歷了已經(jīng)注冊(cè)的服務(wù)url,這里會(huì)有一個(gè)是否為非動(dòng)態(tài)管理模式的判斷,也就是判斷該節(jié)點(diǎn)是否為動(dòng)態(tài)節(jié)點(diǎn),只有動(dòng)態(tài)節(jié)點(diǎn)是需要延長(zhǎng)過(guò)期時(shí)間,因?yàn)閯?dòng)態(tài)節(jié)點(diǎn)需要人工刪除節(jié)點(diǎn)。延長(zhǎng)過(guò)期時(shí)間就是重新注冊(cè)一次。而其他的節(jié)點(diǎn)則會(huì)被監(jiān)控中心清除,也就是調(diào)用了clean方法。clean方法下面會(huì)講到。

4.clean
// The monitoring center is responsible for deleting outdated dirty data
private void clean(Jedis jedis) {
    // 獲得所有的服務(wù)
    Set keys = jedis.keys(root + Constants.ANY_VALUE);
    if (keys != null && !keys.isEmpty()) {
        // 遍歷所有的服務(wù)
        for (String key : keys) {
            // 返回hash表key對(duì)應(yīng)的所有域和值
            // redis的key為服務(wù)名稱(chēng)和服務(wù)的類(lèi)型。map中的key為URL地址,map中的value為過(guò)期時(shí)間,用于判斷臟數(shù)據(jù),臟數(shù)據(jù)由監(jiān)控中心刪除
            Map values = jedis.hgetAll(key);
            if (values != null && values.size() > 0) {
                boolean delete = false;
                long now = System.currentTimeMillis();
                for (Map.Entry entry : values.entrySet()) {
                    URL url = URL.valueOf(entry.getKey());
                    // 是否為動(dòng)態(tài)節(jié)點(diǎn)
                    if (url.getParameter(Constants.DYNAMIC_KEY, true)) {
                        long expire = Long.parseLong(entry.getValue());
                        // 判斷是否過(guò)期
                        if (expire < now) {
                            // 刪除記錄
                            jedis.hdel(key, entry.getKey());
                            delete = true;
                            if (logger.isWarnEnabled()) {
                                logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));
                            }
                        }
                    }
                }
                // 取消注冊(cè)
                if (delete) {
                    jedis.publish(key, Constants.UNREGISTER);
                }
            }
        }
    }
}

該方法就是用來(lái)清理過(guò)期數(shù)據(jù)的,之前我提到過(guò)dubbo在redis存儲(chǔ)數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)形式,就是redis的key為服務(wù)名稱(chēng)和服務(wù)的類(lèi)型。map中的key為URL地址,map中的value為過(guò)期時(shí)間,用于判斷臟數(shù)據(jù),臟數(shù)據(jù)由監(jiān)控中心刪除,那么判斷過(guò)期就是通過(guò)map中的value來(lái)判別。邏輯就是在redis中先把記錄刪除,然后在取消訂閱。

5.isAvailable
@Override
public boolean isAvailable() {
    // 遍歷連接池集合
    for (JedisPool jedisPool : jedisPools.values()) {
        try {
            // 從連接池中獲得jedis實(shí)例
            Jedis jedis = jedisPool.getResource();
            try {
                // 判斷是否有redis服務(wù)器被連接著
                // 只要有一臺(tái)連接,則算注冊(cè)中心可用
                if (jedis.isConnected()) {
                    return true; // At least one single machine is available.
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
        }
    }
    return false;
}

該方法是判斷注冊(cè)中心是否可用,通過(guò)redis是否連接來(lái)判斷,只要有一臺(tái)redis可連接,就算注冊(cè)中心可用。

6.destroy
@Override
public void destroy() {
    super.destroy();
    try {
        // 關(guān)閉過(guò)期執(zhí)行器
        expireFuture.cancel(true);
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    try {
        // 關(guān)閉通知器
        for (Notifier notifier : notifiers.values()) {
            notifier.shutdown();
        }
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
    for (Map.Entry entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            // 銷(xiāo)毀連接池
            jedisPool.destroy();
        } catch (Throwable t) {
            logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
        }
    }
    // 關(guān)閉任務(wù)調(diào)度器
    ExecutorUtil.gracefulShutdown(expireExecutor, expirePeriod);
}

這是銷(xiāo)毀的方法,邏輯很清晰,gracefulShutdown方法在《dubbo源碼解析(四)注冊(cè)中心——dubbo》中已經(jīng)講到。

7.doRegister
@Override
public void doRegister(URL url) {
    // 獲得分類(lèi)路徑
    String key = toCategoryPath(url);
    // 獲得URL字符串作為 Value
    String value = url.toFullString();
    // 計(jì)算過(guò)期時(shí)間
    String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);
    boolean success = false;
    RpcException exception = null;
    // 遍歷連接池集合
    for (Map.Entry entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 寫(xiě)入 Redis Map 鍵
                jedis.hset(key, value, expire);
                // 發(fā)布 Redis 注冊(cè)事件
                // 這樣訂閱該 Key 的服務(wù)消費(fèi)者和監(jiān)控中心,就會(huì)實(shí)時(shí)從 Redis 讀取該服務(wù)的最新數(shù)據(jù)。
                jedis.publish(key, Constants.REGISTER);
                success = true;
                // 如果服務(wù)器端已同步數(shù)據(jù),只需寫(xiě)入單臺(tái)機(jī)器
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法是實(shí)現(xiàn)了父類(lèi)FailbackRegistry的抽象方法,主要是實(shí)現(xiàn)了注冊(cè)的功能,具體的邏輯是先將需要注冊(cè)的服務(wù)信息保存到redis中,然后發(fā)布redis注冊(cè)事件。

8.doUnregister
@Override
public void doUnregister(URL url) {
    // 獲得分類(lèi)路徑
    String key = toCategoryPath(url);
    // 獲得URL字符串作為 Value
    String value = url.toFullString();
    RpcException exception = null;
    boolean success = false;
    for (Map.Entry entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 刪除redis中的記錄
                jedis.hdel(key, value);
                // 發(fā)布redis取消注冊(cè)事件
                jedis.publish(key, Constants.UNREGISTER);
                success = true;
                // 如果服務(wù)器端已同步數(shù)據(jù),只需寫(xiě)入單臺(tái)機(jī)器
                if (!replicate) {
                    break; //  If the server side has synchronized data, just write a single machine
                }
            } finally {
                jedis.close();
            }
        } catch (Throwable t) {
            exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法也是實(shí)現(xiàn)了父類(lèi)的抽象方法,當(dāng)服務(wù)消費(fèi)者或者提供者關(guān)閉時(shí),會(huì)調(diào)用該方法來(lái)取消注冊(cè)。邏輯就是跟注冊(cè)方法方法,先從redis中刪除服務(wù)相關(guān)記錄,然后發(fā)布取消注冊(cè)的事件,從而實(shí)時(shí)通知訂閱者們。

9.doSubscribe
@Override
public void doSubscribe(final URL url, final NotifyListener listener) {
    // 返回服務(wù)地址
    String service = toServicePath(url);
    // 獲得通知器
    Notifier notifier = notifiers.get(service);
    // 如果沒(méi)有該服務(wù)的通知器,則創(chuàng)建一個(gè)
    if (notifier == null) {
        Notifier newNotifier = new Notifier(service);
        notifiers.putIfAbsent(service, newNotifier);
        notifier = notifiers.get(service);
        // 保證并發(fā)情況下,有且只有一個(gè)通知器啟動(dòng)
        if (notifier == newNotifier) {
            notifier.start();
        }
    }
    boolean success = false;
    RpcException exception = null;
    // 遍歷連接池集合進(jìn)行訂閱,直到有一個(gè)訂閱成功,僅僅向一個(gè)redis進(jìn)行訂閱
    for (Map.Entry entry : jedisPools.entrySet()) {
        JedisPool jedisPool = entry.getValue();
        try {
            Jedis jedis = jedisPool.getResource();
            try {
                // 如果服務(wù)地址為*結(jié)尾,也就是處理所有的服務(wù)層發(fā)起的訂閱
                if (service.endsWith(Constants.ANY_VALUE)) {
                    admin = true;
                    // 獲得分類(lèi)層的集合 例如:/dubbo/com.alibaba.dubbo.demo.DemoService/providers
                    Set keys = jedis.keys(service);
                    if (keys != null && !keys.isEmpty()) {
                        // 按照服務(wù)聚合url
                        Map> serviceKeys = new HashMap>();
                        for (String key : keys) {
                            // 獲得服務(wù)路徑,截掉多余部分
                            String serviceKey = toServicePath(key);
                            Set sk = serviceKeys.get(serviceKey);
                            if (sk == null) {
                                sk = new HashSet();
                                serviceKeys.put(serviceKey, sk);
                            }
                            sk.add(key);
                        }
                        // 按照每個(gè)服務(wù)層進(jìn)行發(fā)起通知,因?yàn)榉?wù)地址為*結(jié)尾
                        for (Set sk : serviceKeys.values()) {
                            doNotify(jedis, sk, url, Arrays.asList(listener));
                        }
                    }
                } else {
                    // 處理指定的服務(wù)層發(fā)起的通知
                    doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));
                }
                // 只在一個(gè)redis上進(jìn)行訂閱
                success = true;
                break; // Just read one server"s data
            } finally {
                jedis.close();
            }
        } catch (Throwable t) { // Try the next server
            exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);
        }
    }
    if (exception != null) {
        // 雖然發(fā)生異常,但結(jié)果仍然成功
        if (success) {
            logger.warn(exception.getMessage(), exception);
        } else {
            throw exception;
        }
    }
}

該方法是實(shí)現(xiàn)了訂閱的功能。注意以下幾個(gè)點(diǎn):

服務(wù)只會(huì)向一個(gè)redis進(jìn)行訂閱,只要有一個(gè)訂閱成功就結(jié)束訂閱。

根據(jù)url攜帶的服務(wù)地址來(lái)調(diào)用doNotify的兩個(gè)重載方法。其中一個(gè)只是遍歷通知了所有服務(wù)的監(jiān)聽(tīng)器,doNotify方法我會(huì)在后面講到。

10.doUnsubscribe
@Override
public void doUnsubscribe(URL url, NotifyListener listener) {
}

該方法本來(lái)是取消訂閱的實(shí)現(xiàn),不過(guò)dubbo中并未實(shí)現(xiàn)該邏輯。

11.doNotify
private void doNotify(Jedis jedis, String key) {
    // 遍歷所有的通知器,調(diào)用重載方法今天通知
    for (Map.Entry> entry : new HashMap>(getSubscribed()).entrySet()) {
        doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet(entry.getValue()));
    }
}

private void doNotify(Jedis jedis, Collection keys, URL url, Collection listeners) {
    if (keys == null || keys.isEmpty()
            || listeners == null || listeners.isEmpty()) {
        return;
    }
    long now = System.currentTimeMillis();
    List result = new ArrayList();
    // 獲得分類(lèi)集合
    List categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));
    // 通過(guò)url獲得服務(wù)接口
    String consumerService = url.getServiceInterface();
    // 遍歷分類(lèi)路徑,例如/dubbo/com.alibaba.dubbo.demo.DemoService/providers
    for (String key : keys) {
        // 判斷服務(wù)是否匹配
        if (!Constants.ANY_VALUE.equals(consumerService)) {
            String prvoiderService = toServiceName(key);
            if (!prvoiderService.equals(consumerService)) {
                continue;
            }
        }
        // 從分類(lèi)路徑上獲得分類(lèi)名
        String category = toCategoryName(key);
        // 判斷訂閱的分類(lèi)是否包含該分類(lèi)
        if (!categories.contains(Constants.ANY_VALUE) && !categories.contains(category)) {
            continue;
        }
        List urls = new ArrayList();
        // 返回所有的URL集合
        Map values = jedis.hgetAll(key);
        if (values != null && values.size() > 0) {
            for (Map.Entry entry : values.entrySet()) {
                URL u = URL.valueOf(entry.getKey());
                // 判斷是否為動(dòng)態(tài)節(jié)點(diǎn),因?yàn)閯?dòng)態(tài)節(jié)點(diǎn)不受過(guò)期限制。并且判斷是否過(guò)期
                if (!u.getParameter(Constants.DYNAMIC_KEY, true)
                        || Long.parseLong(entry.getValue()) >= now) {
                    // 判斷url是否合法
                    if (UrlUtils.isMatch(url, u)) {
                        urls.add(u);
                    }
                }
            }
        }
        // 若不存在匹配的url,則創(chuàng)建 `empty://` 的 URL返回,用于清空該服務(wù)的該分類(lèi)。
        if (urls.isEmpty()) {
            urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)
                    .setAddress(Constants.ANYHOST_VALUE)
                    .setPath(toServiceName(key))
                    .addParameter(Constants.CATEGORY_KEY, category));
        }
        result.addAll(urls);
        if (logger.isInfoEnabled()) {
            logger.info("redis notify: " + key + " = " + urls);
        }
    }
    if (result == null || result.isEmpty()) {
        return;
    }
    // 全部數(shù)據(jù)完成后,調(diào)用通知方法,來(lái)通知監(jiān)聽(tīng)器
    for (NotifyListener listener : listeners) {
        notify(url, listener, result);
    }
}

該方法實(shí)現(xiàn)了通知的邏輯,有兩個(gè)重載方法,第二個(gè)比第一個(gè)多了幾個(gè)參數(shù),其實(shí)唯一的區(qū)別就是第一個(gè)重載方法是通知了所有的監(jiān)聽(tīng)器,內(nèi)部邏輯中調(diào)用了getSubscribed方法獲取所有的監(jiān)聽(tīng)器,該方法的解釋可以查看《dubbo源碼解析(三)注冊(cè)中心——開(kāi)篇》中關(guān)于subscribed屬性的解釋。而第二個(gè)重載方法就是對(duì)一個(gè)指定的監(jiān)聽(tīng)器進(jìn)行通知。

具體的邏輯在第二個(gè)重載的方法中,其中有以下幾個(gè)需要注意的點(diǎn):

通知的事件要和監(jiān)聽(tīng)器匹配。

不同的角色會(huì)關(guān)注不同的分類(lèi),服務(wù)消費(fèi)者會(huì)關(guān)注providers、configurations、routes這幾個(gè)分類(lèi),而服務(wù)提供者會(huì)關(guān)注consumers分類(lèi),監(jiān)控中心會(huì)關(guān)注所有分類(lèi)。

遍歷分類(lèi)路徑,分類(lèi)路徑是Root + Service + Type。

12.toServiceName
private String toServiceName(String categoryPath) {
    String servicePath = toServicePath(categoryPath);
    return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;
}

該方法很簡(jiǎn)單,就是從服務(wù)路徑上獲得服務(wù)名,這里就不多做解釋了。

13.toCategoryName
private String toCategoryName(String categoryPath) {
    int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR);
    return i > 0 ? categoryPath.substring(i + 1) : categoryPath;
}

該方法的作用是從分類(lèi)路徑上獲得分類(lèi)名。

14.toServicePath
private String toServicePath(String categoryPath) {
    int i;
    if (categoryPath.startsWith(root)) {
        i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length());
    } else {
        i = categoryPath.indexOf(Constants.PATH_SEPARATOR);
    }
    return i > 0 ? categoryPath.substring(0, i) : categoryPath;
}

private String toServicePath(URL url) {
    return root + url.getServiceInterface();
}

這兩個(gè)方法都是獲得服務(wù)地址,第一個(gè)方法主要是截掉多余的部分,第二個(gè)方法主要是從url配置中獲取關(guān)于服務(wù)地址的值跟根節(jié)點(diǎn)拼接。

15.toCategoryPath
private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

該方法是獲得分類(lèi)路徑,格式是Root + Service + Type。

16.內(nèi)部類(lèi)NotifySub
private class NotifySub extends JedisPubSub {

    private final JedisPool jedisPool;

    public NotifySub(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    @Override
    public void onMessage(String key, String msg) {
        if (logger.isInfoEnabled()) {
            logger.info("redis event: " + key + " = " + msg);
        }
        // 如果是注冊(cè)事件或者取消注冊(cè)事件
        if (msg.equals(Constants.REGISTER)
                || msg.equals(Constants.UNREGISTER)) {
            try {
                Jedis jedis = jedisPool.getResource();
                try {
                    // 通知監(jiān)聽(tīng)器
                    doNotify(jedis, key);
                } finally {
                    jedis.close();
                }
            } catch (Throwable t) { // TODO Notification failure does not restore mechanism guarantee
                logger.error(t.getMessage(), t);
            }
        }
    }

    @Override
    public void onPMessage(String pattern, String key, String msg) {
        onMessage(key, msg);
    }

    @Override
    public void onSubscribe(String key, int num) {
    }

    @Override
    public void onPSubscribe(String pattern, int num) {
    }

    @Override
    public void onUnsubscribe(String key, int num) {
    }

    @Override
    public void onPUnsubscribe(String pattern, int num) {
    }

}

NotifySub是RedisRegistry的一個(gè)內(nèi)部類(lèi),繼承了JedisPubSub類(lèi),JedisPubSub類(lèi)中定義了publish/subsribe的回調(diào)方法。通過(guò)繼承JedisPubSub類(lèi)并重新實(shí)現(xiàn)這些回調(diào)方法,當(dāng)publish/subsribe事件發(fā)生時(shí),我們可以定制自己的處理邏輯。這里實(shí)現(xiàn)了onMessage和onPMessage兩個(gè)方法,當(dāng)收到注冊(cè)和取消注冊(cè)的事件的時(shí)候通知相關(guān)的監(jiān)聽(tīng)器數(shù)據(jù)變化,從而實(shí)現(xiàn)實(shí)時(shí)更新數(shù)據(jù)。

17.內(nèi)部類(lèi)Notifier

該類(lèi)繼承 Thread 類(lèi),負(fù)責(zé)向 Redis 發(fā)起訂閱邏輯。

1.屬性
// 服務(wù)名:Root + Service
private final String service;
// 需要忽略連接的次數(shù)
private final AtomicInteger connectSkip = new AtomicInteger();
// 已經(jīng)忽略連接的次數(shù)
private final AtomicInteger connectSkiped = new AtomicInteger();
// 隨機(jī)數(shù)
private final Random random = new Random();
// jedis實(shí)例
private volatile Jedis jedis;
// 是否是首次通知
private volatile boolean first = true;
// 是否運(yùn)行中
private volatile boolean running = true;
// 連接次數(shù)隨機(jī)數(shù)
private volatile int connectRandom;

上述屬性中,部分屬性都是為了redis的重連策略,用于在和redis斷開(kāi)鏈接時(shí),忽略一定的次數(shù)和redis的連接,避免空跑。

2.resetSkip
private void resetSkip() {
    connectSkip.set(0);
    connectSkiped.set(0);
    connectRandom = 0;
}

該方法就是重置忽略連接的信息。

3.isSkip
private boolean isSkip() {
    // 獲得忽略次數(shù)
    int skip = connectSkip.get(); // Growth of skipping times
    // 如果忽略次數(shù)超過(guò)10次,那么取隨機(jī)數(shù),加上一個(gè)10以內(nèi)的隨機(jī)數(shù)
    // 連接失敗的次數(shù)越多,每一輪加大需要忽略的總次數(shù),并且?guī)в幸欢ǖ碾S機(jī)性。
    if (skip >= 10) { // If the number of skipping times increases by more than 10, take the random number
        if (connectRandom == 0) {
            connectRandom = random.nextInt(10);
        }
        skip = 10 + connectRandom;
    }
    // 自增忽略次數(shù)。若忽略次數(shù)不夠,則繼續(xù)忽略。
    if (connectSkiped.getAndIncrement() < skip) { // Check the number of skipping times
        return true;
    }
    // 增加需要忽略的次數(shù)
    connectSkip.incrementAndGet();
    // 重置已忽略次數(shù)和隨機(jī)數(shù)
    connectSkiped.set(0);
    connectRandom = 0;
    return false;
}

該方法是用來(lái)判斷忽略本次對(duì)redis的連接。首先獲得需要忽略的次數(shù),如果忽略次數(shù)不小于10次,則加上一個(gè)10以內(nèi)的隨機(jī)數(shù),然后判斷自增的忽略次數(shù),如果次數(shù)不夠,則繼續(xù)忽略,如果次數(shù)夠了,增加需要忽略的次數(shù),重置已經(jīng)忽略的次數(shù)和隨機(jī)數(shù)。主要的思想是連接失敗的次數(shù)越多,每一輪加大需要忽略的總次數(shù),并且?guī)в幸欢ǖ碾S機(jī)性。

4.run
@Override
public void run() {
    // 當(dāng)通知器正在運(yùn)行中時(shí)
    while (running) {
        try {
            // 如果不忽略連接
            if (!isSkip()) {
                try {
                    for (Map.Entry entry : jedisPools.entrySet()) {
                        JedisPool jedisPool = entry.getValue();
                        try {
                            jedis = jedisPool.getResource();
                            try {
                                // 是否為監(jiān)控中心
                                if (service.endsWith(Constants.ANY_VALUE)) {
                                    // 如果不是第一次通知
                                    if (!first) {
                                        first = false;
                                        Set keys = jedis.keys(service);
                                        if (keys != null && !keys.isEmpty()) {
                                            for (String s : keys) {
                                                // 通知
                                                doNotify(jedis, s);
                                            }
                                        }
                                        // 重置
                                        resetSkip();
                                    }
                                    // 批準(zhǔn)訂閱
                                    jedis.psubscribe(new NotifySub(jedisPool), service); // blocking
                                } else {
                                    // 如果不是監(jiān)控中心,并且不是第一次通知
                                    if (!first) {
                                        first = false;
                                        // 多帶帶通知一個(gè)服務(wù)
                                        doNotify(jedis, service);
                                        // 重置
                                        resetSkip();
                                    }
                                    // 批準(zhǔn)訂閱
                                    jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // blocking
                                }
                                break;
                            } finally {
                                jedis.close();
                            }
                        } catch (Throwable t) { // Retry another server
                            logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);
                            // If you only have a single redis, you need to take a rest to avoid overtaking a lot of CPU resources
                            // 發(fā)生異常,說(shuō)明 Redis 連接斷開(kāi)了,需要等待reconnectPeriod時(shí)間
                            //通過(guò)這樣的方式,避免執(zhí)行,占用大量的 CPU 資源。
                            sleep(reconnectPeriod);
                        }
                    }
                } catch (Throwable t) {
                    logger.error(t.getMessage(), t);
                    sleep(reconnectPeriod);
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }
}

該方法是線程的run方法,應(yīng)該很熟悉,其中做了相關(guān)訂閱的邏輯,其中根據(jù)redis的重連策略做了一些忽略連接的策略,也就是調(diào)用了上述講解的isSkip方法,訂閱就是調(diào)用了jedis.psubscribe方法,它是訂閱給定模式相匹配的所有頻道。

4.shutdown
public void shutdown() {
    try {
        // 更改狀態(tài)
        running = false;
        // jedis斷開(kāi)連接
        jedis.disconnect();
    } catch (Throwable t) {
        logger.warn(t.getMessage(), t);
    }
}

該方法是斷開(kāi)連接的方法。

(二)RedisRegistryFactory

該類(lèi)繼承了AbstractRegistryFactory類(lèi),實(shí)現(xiàn)了AbstractRegistryFactory抽象出來(lái)的createRegistry方法,看一下原代碼:

public class RedisRegistryFactory extends AbstractRegistryFactory {

    @Override
    protected Registry createRegistry(URL url) {
        return new RedisRegistry(url);
    }

}

可以看到就是實(shí)例化了RedisRegistry而已,所有這里就不解釋了。

后記
該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了dubbo利用redis來(lái)實(shí)現(xiàn)注冊(cè)中心,其中關(guān)鍵的是需要弄明白dubbo在redis中存儲(chǔ)的數(shù)據(jù)結(jié)構(gòu),也就是key-value中key代表什么,value代表什么。還有就是需要了解JRedis和JedisPool,其他的邏輯并不復(fù)雜。如果我在哪一部分寫(xiě)的不夠到位或者寫(xiě)錯(cuò)了,歡迎給我提意見(jiàn),我的私人微信號(hào)碼:HUA799695226。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/72264.html

相關(guān)文章

  • dubbo源碼解析(四十三)2.7新特性

    摘要:大揭秘目標(biāo)了解的新特性,以及版本升級(jí)的引導(dǎo)。四元數(shù)據(jù)改造我們知道以前的版本只有注冊(cè)中心,注冊(cè)中心的有數(shù)十個(gè)的鍵值對(duì),包含了一個(gè)服務(wù)所有的元數(shù)據(jù)。 DUBBO——2.7大揭秘 目標(biāo):了解2.7的新特性,以及版本升級(jí)的引導(dǎo)。 前言 我們知道Dubbo在2011年開(kāi)源,停止更新了一段時(shí)間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發(fā)布了 2.5.4 版本。隨后,版本...

    qqlcbb 評(píng)論0 收藏0
  • dubbo源碼解析(一)Hello,Dubbo

    摘要:英文全名為,也叫遠(yuǎn)程過(guò)程調(diào)用,其實(shí)就是一個(gè)計(jì)算機(jī)通信協(xié)議,它是一種通過(guò)網(wǎng)絡(luò)從遠(yuǎn)程計(jì)算機(jī)程序上請(qǐng)求服務(wù)而不需要了解底層網(wǎng)絡(luò)技術(shù)的協(xié)議。 Hello,Dubbo 你好,dubbo,初次見(jiàn)面,我想和你交個(gè)朋友。 Dubbo你到底是什么? 先給出一套官方的說(shuō)法:Apache Dubbo是一款高性能、輕量級(jí)基于Java的RPC開(kāi)源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...

    evin2016 評(píng)論0 收藏0
  • dubbo源碼解析(三)注冊(cè)中心——開(kāi)篇

    摘要:是用來(lái)監(jiān)聽(tīng)處理注冊(cè)數(shù)據(jù)變更的事件。這里的是節(jié)點(diǎn)的接口,里面協(xié)定了關(guān)于節(jié)點(diǎn)的一些操作方法,我們可以來(lái)看看源代碼獲得節(jié)點(diǎn)地址判斷節(jié)點(diǎn)是否可用銷(xiāo)毀節(jié)點(diǎn)三這個(gè)接口是注冊(cè)中心的工廠接口,用來(lái)返回注冊(cè)中心的對(duì)象。 注冊(cè)中心——開(kāi)篇 目標(biāo):解釋注冊(cè)中心在dubbo框架中作用,dubbo-registry-api源碼解讀 注冊(cè)中心是什么? 服務(wù)治理框架中可以大致分為服務(wù)通信和服務(wù)管理兩個(gè)部分,服務(wù)管理...

    CastlePeaK 評(píng)論0 收藏0
  • 墻裂推薦:搜云庫(kù)技術(shù)團(tuán)隊(duì),面試必備的技術(shù)干貨

    摘要:今天整理了一下近大半年以來(lái)的一些文章,和我的預(yù)期一樣,很多文章我都忘記自己曾經(jīng)寫(xiě)過(guò)了,這個(gè)記錄的過(guò)程讓我也有了新的理解。希望大家,收藏,點(diǎn)贊,加轉(zhuǎn)發(fā)。 今天整理了一下近大半年以來(lái)的一些文章,和我的預(yù)期一樣,很多文章我都忘記自己曾經(jīng)寫(xiě)過(guò)了,這個(gè)記錄的過(guò)程讓我也有了新的理解。希望大家,收藏,點(diǎn)贊,加轉(zhuǎn)發(fā)。 面試必備 面試必備:深入Spring MVC DispatchServlet 源碼...

    SegmentFault 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

閱讀需要支付1元查看
<