中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring?Cloud集成Nacos?Config動態刷新的方法

發布時間:2022-08-13 09:26:54 來源:億速云 閱讀:427 作者:iii 欄目:開發技術

本篇內容主要講解“Spring Cloud集成Nacos Config動態刷新的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Spring Cloud集成Nacos Config動態刷新的方法”吧!

正文

從遠端服務器獲取變更數據的主要模式有兩種:推(push)和拉(pull)。Push 模式簡單來說就是服務端主動將數據變更信息推送給客戶端,這種模式優點是時效性好,服務端數據發生變更可以立馬通知到客戶端,但這種模式需要服務端維持與客戶端的心跳連接,會增加服務端實現的復雜度,服務端也需要占用更多的資源來維持與客戶端的連接。

而 Pull 模式則是客戶端主動去服務器請求數據,例如,每間隔10ms就向服務端發起請求獲取數據。顯而易見pull模式存在時效性問題。

請求的間隔也不太好設置,間隔太短,對服務器請求壓力過大。間隔時間過長,那么必然會造成時效性很差。而且如果配置長時間不更新,并且存在大量的客戶端就會產生大量無效的pull請求。

Nacos Config動態刷新機制

Nacos 沒有采用上述的兩種模式,而是采用了長輪詢方式結合了推和拉的優點:

Spring?Cloud集成Nacos?Config動態刷新的方法

  • 長輪詢也是輪詢,因此 Nacos 客戶端會默認每10ms向服務端發起請求,當客戶端請求服務端時會在請求頭上攜帶長輪詢的超時時間,默認是30s。而服務端接收到該請求時會hang住請求,為了防止客戶端超時會在請求頭攜帶的超時時間上減去500ms,因此默認會hang住請求29.5s。

  • 在這期間如果服務端發生了配置變更會產生相應的事件,監聽到該事件后,會響應對應的客戶端。這樣一來客戶端不會頻繁發起輪詢請求,而服務端也不需要維持與客戶端的心跳,兼備了時效性和復雜度。

如果你覺得源碼枯燥的話,可以選擇不看后半部分的源碼,先通過這張流程圖去了解Nacos動態刷新機制的流程:

Spring?Cloud集成Nacos?Config動態刷新的方法

Nacos Config 長輪詢源碼剖析

首先,打開 com.alibaba.cloud.nacos.NacosConfigBootstrapConfiguration 這個類,從類名也可以看出該類是Nacos Config的啟動配置類,是Nacos Config自動裝配的入口。在該類中的 nacosConfigManager 方法實例化了一個 NacosConfigManager 對象,并注冊到容器中:

@Bean
@ConditionalOnMissingBean
public NacosConfigManager nacosConfigManager(
		NacosConfigProperties nacosConfigProperties) {
	return new NacosConfigManager(nacosConfigProperties);
}

NacosConfigManager 的構造器中調用了 createConfigService 方法,這是一個靜態方法用來創建 ConfigService 對象的單例。

/**
 * Compatible with old design,It will be perfected in the future.
 */
static ConfigService createConfigService(
		NacosConfigProperties nacosConfigProperties) {
    // 雙重檢查鎖模式的單例
	if (Objects.isNull(service)) {
		synchronized (NacosConfigManager.class) {
			try {
				if (Objects.isNull(service)) {
					service = NacosFactory.createConfigService(
							nacosConfigProperties.assembleConfigServiceProperties());
				}
			}
			catch (NacosException e) {
				log.error(e.getMessage());
				throw new NacosConnectionFailureException(
						nacosConfigProperties.getServerAddr(), e.getMessage(), e);
			}
		}
	}
	return service;
}

ConfigService 的具體實現是 NacosConfigService,在該類的構造器中主要初始化了 HttpAgentClientWorker 對象。

ClientWorker構造器初始化線程池

ClientWorker 的構造器中則初始化了幾個線程池:

public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager,
        final Properties properties) {
    this.agent = agent;
    this.configFilterChainManager = configFilterChainManager;
    // Initialize the timeout parameter
    init(properties);
    // 創建具有定時執行功能的單線程池,用于定時執行 checkConfigInfo 方法
    this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
            t.setDaemon(true);
            return t;
        }
    });
    // 創建具有定時執行功能的且線程數與cpu核數相對應的線程池,用于根據需要動態刷新的配置文件執行 LongPollingRunnable,因此長輪詢任務是可以有多個并行的
    this.executorService = Executors
            .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName());
                    t.setDaemon(true);
                    return t;
                }
            });
    // 每10ms執行一次 checkConfigInfo 方法
    this.executor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                checkConfigInfo();
            } catch (Throwable e) {
                LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e);
            }
        }
    }, 1L, 10L, TimeUnit.MILLISECONDS);
}
private void init(Properties properties) {
    // 長輪詢的超時時間,默認為30秒,此參數會被放到請求頭中帶到服務端,服務端會根據該參數去做長輪詢的hold
    timeout = Math.max(ConvertUtils.toInt(properties.getProperty(PropertyKeyConst.CONFIG_LONG_POLL_TIMEOUT),
            Constants.CONFIG_LONG_POLL_TIMEOUT), Constants.MIN_CONFIG_LONG_POLL_TIMEOUT);
    taskPenaltyTime = ConvertUtils
            .toInt(properties.getProperty(PropertyKeyConst.CONFIG_RETRY_TIME), Constants.CONFIG_RETRY_TIME);
    this.enableRemoteSyncConfig = Boolean
            .parseBoolean(properties.getProperty(PropertyKeyConst.ENABLE_REMOTE_SYNC_CONFIG));
}
/**
 * Check config info.
 */
public void checkConfigInfo() {
    // Dispatch taskes.
    // 獲取需要監聽的文件數量
    int listenerSize = cacheMap.size();
    // Round up the longingTaskCount.
    // 默認一個 LongPollingRunnable 可以處理監聽3k個配置文件的變化,超過3k個才會創建新的 LongPollingRunnable
    int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
    if (longingTaskCount > currentLongingTaskCount) {
        for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) {
            // The task list is no order.So it maybe has issues when changing.
            executorService.execute(new LongPollingRunnable(i));
        }
        currentLongingTaskCount = longingTaskCount;
    }
}

LongPollingRunnable 類主要用于檢查本地配置,以及長輪詢地去服務端獲取變更配置的 dataid 和 group,其代碼位于 com.alibaba.nacos.client.config.impl.ClientWorker 類,代碼如下:

class LongPollingRunnable implements Runnable {
    private final int taskId;
    public LongPollingRunnable(int taskId) {
        this.taskId = taskId;
    }
    @Override
    public void run() {
        List<CacheData> cacheDatas = new ArrayList<CacheData>();
        List<String> inInitializingCacheList = new ArrayList<String>();
        try {
            // check failover config
            // 遍歷本地緩存的配置
            for (CacheData cacheData : cacheMap.values()) {
                if (cacheData.getTaskId() == taskId) {
                    cacheDatas.add(cacheData);
                    try {
                        // 檢查本地配置
                        checkLocalConfig(cacheData);
                        if (cacheData.isUseLocalConfigInfo()) {
                            cacheData.checkListenerMd5();
                        }
                    } catch (Exception e) {
                        LOGGER.error("get local config info error", e);
                    }
                }
            }
            // check server config
            // 通過長輪詢檢查服務端配置
            List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
            if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                LOGGER.info("get changedGroupKeys:" + changedGroupKeys);
            }
            for (String groupKey : changedGroupKeys) {
                String[] key = GroupKey.parseKey(groupKey);
                String dataId = key[0];
                String group = key[1];
                String tenant = null;
                if (key.length == 3) {
                    tenant = key[2];
                }
                try {
                    String[] ct = getServerConfig(dataId, group, tenant, 3000L);
                    CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                    cache.setContent(ct[0]);
                    if (null != ct[1]) {
                        cache.setType(ct[1]);
                    }
                    LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                            agent.getName(), dataId, group, tenant, cache.getMd5(),
                            ContentUtils.truncateContent(ct[0]), ct[1]);
                } catch (NacosException ioe) {
                    String message = String
                            .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                    agent.getName(), dataId, group, tenant);
                    LOGGER.error(message, ioe);
                }
            }
            for (CacheData cacheData : cacheDatas) {
                if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                    cacheData.checkListenerMd5();
                    cacheData.setInitializing(false);
                }
            }
            inInitializingCacheList.clear();
            executorService.execute(this);
        } catch (Throwable e) {
            // If the rotation training task is abnormal, the next execution time of the task will be punished
            LOGGER.error("longPolling error : ", e);
            executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
        }
    }
}

上面有個 checkUpdateDataIds 方法,用于獲取發生變更了的配置文件的dataId列表,它同樣位于 ClientWorker 內。

如下:

/**
 * Fetch the dataId list from server.
 *
 * @param cacheDatas              CacheDatas for config infomations.
 * @param inInitializingCacheList initial cache lists.
 * @return String include dataId and group (ps: it maybe null).
 * @throws Exception Exception.
 */
List<String> checkUpdateDataIds(List<CacheData> cacheDatas, List<String> inInitializingCacheList) throws Exception {
    // 拼接出配置文件的唯一標識
    StringBuilder sb = new StringBuilder();
    for (CacheData cacheData : cacheDatas) {
        if (!cacheData.isUseLocalConfigInfo()) {
            sb.append(cacheData.dataId).append(WORD_SEPARATOR);
            sb.append(cacheData.group).append(WORD_SEPARATOR);
            if (StringUtils.isBlank(cacheData.tenant)) {
                sb.append(cacheData.getMd5()).append(LINE_SEPARATOR);
            } else {
                sb.append(cacheData.getMd5()).append(WORD_SEPARATOR);
                sb.append(cacheData.getTenant()).append(LINE_SEPARATOR);
            }
            if (cacheData.isInitializing()) {
                // It updates when cacheData occours in cacheMap by first time.
                inInitializingCacheList
                        .add(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant));
            }
        }
    }
    boolean isInitializingCacheList = !inInitializingCacheList.isEmpty();
    return checkUpdateConfigStr(sb.toString(), isInitializingCacheList);
}
/**
 * Fetch the updated dataId list from server.
 *
 * @param probeUpdateString       updated attribute string value.
 * @param isInitializingCacheList initial cache lists.
 * @return The updated dataId list(ps: it maybe null).
 * @throws IOException Exception.
 */
List<String> checkUpdateConfigStr(String probeUpdateString, boolean isInitializingCacheList) throws Exception {
    Map<String, String> params = new HashMap<String, String>(2);
    params.put(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    Map<String, String> headers = new HashMap<String, String>(2);
    // 長輪詢的超時時間
    headers.put("Long-Pulling-Timeout", "" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    if (isInitializingCacheList) {
        headers.put("Long-Pulling-Timeout-No-Hangup", "true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    try {
        // In order to prevent the server from handling the delay of the client's long task,
        // increase the client's read timeout to avoid this problem.
        long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
        // 向服務端發起一個http請求,該請求在服務端配置沒有變更的情況下默認會hang住30s
        HttpRestResult<String> result = agent
                .httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params, agent.getEncode(),
                        readTimeoutMs);
        if (result.ok()) {
            setHealthServer(true);
            // 響應狀態是成功則解析響應體得到 dataId、group、tenant 等信息并返回
            return parseUpdateDataIdResponse(result.getData());
        } else {
            setHealthServer(false);
            LOGGER.error("[{}] [check-update] get changed dataId error, code: {}", agent.getName(),
                    result.getCode());
        }
    } catch (Exception e) {
        setHealthServer(false);
        LOGGER.error("[" + agent.getName() + "] [check-update] get changed dataId exception", e);
        throw e;
    }
    return Collections.emptyList();
}

客戶端對 listener 接口的請求會進入到服務端的com.alibaba.nacos.config.server.controller.ConfigController#listener 方法進行處理,該方法主要是調用了 com.alibaba.nacos.config.server.controller.ConfigServletInner#doPollingConfig 方法。

代碼如下:

/**
 * 輪詢接口
 */
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response,
                              Map<String, String> clientMd5Map, int probeRequestSize)
    throws IOException, ServletException {
    // 如果支持長輪詢則進入長輪詢的流程
    if (LongPollingService.isSupportLongPolling(request)) {
        longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
        return HttpServletResponse.SC_OK + "";
    }
    // else 兼容短輪詢邏輯
    List<String> changedGroups = MD5Util.compareMd5(request, response, clientMd5Map);
    // 兼容短輪詢result
    String oldResult = MD5Util.compareMd5OldResult(changedGroups);
    String newResult = MD5Util.compareMd5ResultString(changedGroups);
    String version = request.getHeader(Constants.CLIENT_VERSION_HEADER);
    if (version == null) {
        version = "2.0.0";
    }
    int versionNum = Protocol.getVersionNumber(version);
    /**
     * 2.0.4版本以前, 返回值放入header中
     */
    if (versionNum < START_LONGPOLLING_VERSION_NUM) {
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE, oldResult);
        response.addHeader(Constants.PROBE_MODIFY_RESPONSE_NEW, newResult);
    } else {
        request.setAttribute("content", newResult);
    }
    // 禁用緩存
    response.setHeader("Pragma", "no-cache");
    response.setDateHeader("Expires", 0);
    response.setHeader("Cache-Control", "no-cache,no-store");
    response.setStatus(HttpServletResponse.SC_OK);
    return HttpServletResponse.SC_OK + "";
}

我們主要關注上面的 com.alibaba.nacos.config.server.service.LongPollingService#addLongPollingClient 長輪詢流程的方法。

長輪詢流程方法

代碼如下:

public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
                                 int probeRequestSize) {
    String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
    String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
    String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
    String tag = req.getHeader("Vipserver-Tag");
    int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
    /**
     * 提前500ms返回響應,為避免客戶端超時 @qiaoyi.dingqy 2013.10.22改動  add delay time for LoadBalance
     */
    long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
    if (isFixedPolling()) {
        timeout = Math.max(10000, getFixedPollingInterval());
        // do nothing but set fix polling timeout
    } else {
        long start = System.currentTimeMillis();
        List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
        if (changedGroups.size() > 0) {
            generateResponse(req, rsp, changedGroups);
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
                clientMd5Map.size(), probeRequestSize, changedGroups.size());
            return;
        } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
            LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
                RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
                changedGroups.size());
            return;
        }
    }
    String ip = RequestUtil.getRemoteIp(req);
    // 一定要由HTTP線程調用,否則離開后容器會立即發送響應
    final AsyncContext asyncContext = req.startAsync();
    // AsyncContext.setTimeout()的超時時間不準,所以只能自己控制
    asyncContext.setTimeout(0L);
	// 在 ClientLongPolling 的 run 方法會將 ClientLongPolling 實例(攜帶了本次請求的相關信息)放入 allSubs 中,然后會在29.5s后再執行另一個 Runnable,該 Runnable 用于等待29.5s后依舊沒有相應的配置變更時對客戶端進行響應,并將相應的 ClientLongPolling 實例從 allSubs 中移出
    scheduler.execute(
        new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}

LongPollingService 實現了 AbstractEventListener,也就是說能接收事件通知,在其 com.alibaba.nacos.config.server.service.LongPollingService#onEvent 方法中可以看到,它關注的是 LocalDataChangeEvent 事件:

@Override
public void onEvent(Event event) {
    if (isFixedPolling()) {
        // ignore
    } else {
        if (event instanceof LocalDataChangeEvent) {
            LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
            scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
        }
    }
}

在nacos上修改配置后就會產生 LocalDataChangeEvent 事件,此時 LongPollingService 也就能監聽到,當收到該事件時就會遍歷 allSubs,找到匹配的請求并將 groupKey 返回給客戶端。

具體代碼在 DataChangeTask 中:

class DataChangeTask implements Runnable {
    @Override
    public void run() {
        try {
            ConfigService.getContentBetaMd5(groupKey);
            for (Iterator<ClientLongPolling> iter = allSubs.iterator(); iter.hasNext(); ) {
                ClientLongPolling clientSub = iter.next();
                if (clientSub.clientMd5Map.containsKey(groupKey)) {
                    // 如果beta發布且不在beta列表直接跳過
                    if (isBeta && !betaIps.contains(clientSub.ip)) {
                        continue;
                    }
                    // 如果tag發布且不在tag列表直接跳過
                    if (StringUtils.isNotBlank(tag) && !tag.equals(clientSub.tag)) {
                        continue;
                    }
                    getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                    iter.remove(); // 刪除訂閱關系
                    LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
                        (System.currentTimeMillis() - changeTime),
                        "in-advance",
                        RequestUtil.getRemoteIp((HttpServletRequest)clientSub.asyncContext.getRequest()),
                        "polling",
                        clientSub.clientMd5Map.size(), clientSub.probeRequestSize, groupKey);
                    clientSub.sendResponse(Arrays.asList(groupKey));
                }
            }
        } catch (Throwable t) {
            LogUtil.defaultLog.error("data change error:" + t.getMessage(), t.getCause());
        }
    }
    DataChangeTask(String groupKey) {
        this(groupKey, false, null);
    }
    DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps) {
        this(groupKey, isBeta, betaIps, null);
    }
    DataChangeTask(String groupKey, boolean isBeta, List<String> betaIps, String tag) {
        this.groupKey = groupKey;
        this.isBeta = isBeta;
        this.betaIps = betaIps;
        this.tag = tag;
    }
    final String groupKey;
    final long changeTime = System.currentTimeMillis();
    final boolean isBeta;
    final List<String> betaIps;
    final String tag;
}

當客戶端收到變更的dataid+group后,就會去服務端獲取最新的配置數據,并更新本地數據 cacheData,然后發送數據變更事件,整個流程結束。

  • 獲取服務端最新配置數據的方法:com.alibaba.nacos.client.config.impl.ClientWorker#getServerConfig

  • 發送數據變更事件的方法:com.alibaba.nacos.client.config.impl.CacheData#checkListenerMd5

最后附上一張流程與源碼的對應圖:

Spring?Cloud集成Nacos?Config動態刷新的方法

到此,相信大家對“Spring Cloud集成Nacos Config動態刷新的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

丰宁| 敦化市| 牡丹江市| 增城市| 台前县| 保靖县| 德令哈市| 安西县| 嘉禾县| 景东| 崇州市| 噶尔县| 临沂市| 时尚| 苍溪县| 霞浦县| 壶关县| 夏河县| 邵东县| 吉林省| 仁化县| 平利县| 枣阳市| 吉木萨尔县| 文水县| 五莲县| 丹棱县| 宣汉县| 板桥市| 芜湖县| 渭源县| 台湾省| 庆城县| 环江| 定日县| 林芝县| 商河县| 益阳市| 邵东县| 长治市| 新和县|