您好,登錄后才能下訂單哦!
本篇內容主要講解“TopicLookup請求處理方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“TopicLookup請求處理方法是什么”吧!
簡單邏輯說明
通過topic名字確定namespace
查找這個namespace的bundle分配信息
根據bundle分配信息來確認這個topic屬于哪個bundle
根據bundle信息來確認哪個broker負責這個bundle,返回broker的地址。
CommandLookup
主要用來查找Topic在被哪個broker負責。
一般客戶端可以通過http協議或者二進制協議來查詢。
message CommandLookupTopic { // topic 名字 required string topic = 1; // 網絡層請求id required uint64 request_id = 2; optional bool authoritative = 3 [default = false]; // TODO - Remove original_principal, original_auth_data, original_auth_method // Original principal that was verified by // a Pulsar proxy. optional string original_principal = 4; // Original auth role and auth Method that was passed // to the proxy. optional string original_auth_data = 5; optional string original_auth_method = 6; // 從哪個指定的連接點進行連接 optional string advertised_listener_name = 7; }
這里直接看服務端的代碼ServerCnx
protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); final boolean authoritative = lookup.isAuthoritative(); final String advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName() : null; // 校驗topic名字 TopicName topicName = validateTopicName(lookup.getTopic(), requestId, lookup); if (topicName == null) { return; } // 這里的Semaphore 是服務端Lookup請求的限流器 final Semaphore lookupSemaphore = service.getLookupRequestSemaphore(); if (lookupSemaphore.tryAcquire()) { .... isTopicOperationAllowed(topicName, TopicOperation.LOOKUP) .thenApply(isAuthorized -> { // 通過鑒權 if (isAuthorized) { lookupTopicAsync(getBrokerService().pulsar(), topicName, authoritative, getPrincipal(), getAuthenticationData(), requestId, advertisedListenerName) .handle((lookupResponse, ex) -> { if (ex == null) { ctx.writeAndFlush(lookupResponse); } else { .... } lookupSemaphore.release(); return null; }); } else { .... }).exceptionally(ex -> { .... }); } else { // 如果有異常是發送的`CommandLookupTopicResponse` // 這里已經是新的定義二進制消息的方式了 // / Wire format // [TOTAL_SIZE] [CMD_SIZE][CMD] ctx.writeAndFlush(newLookupErrorResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId)); } }
org.apache.pulsar.broker.lookup.TopicLookupBase#lookupTopicAsync
這個是一個靜態方法
主要
validation 校驗集群,topic名字等(這里面有跨集群檢查的邏輯,先略過)
lookup邏輯
這里校驗的邏輯先略過了,實際核心的邏輯在下面這2行上。
LookupOptions options = LookupOptions.builder() .authoritative(authoritative) .advertisedListenerName(advertisedListenerName) .loadTopicsInBundle(true) // 這里這個條件是true .build(); pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
這里面的主要邏輯在NamespaceService
里面,PulsarService
可以認為是一個全局對象,pulsar需要的任何核心邏輯對象
(比如說NamspaceService
,BrokerService
,ConfigurationCacheService
等)你都可以從這個對象里面拿到。
這里面的主要邏輯是
根據傳遞過來的topic名字定位namespace
之后確認這個topic屬于哪個NamespaceBundle。
之后根據這個NamespaceBundle 來找到這個bundle 的owner broker的地址。
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) { .... CompletableFuture<Optional<LookupResult>> future = getBundleAsync(topic) .thenCompose(bundle -> findBrokerServiceUrl(bundle, options)); .... } public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) { return bundleFactory.getBundlesAsync(topic.getNamespaceObject()) .thenApply(bundles -> bundles.findBundle(topic)); }
這里面的bundleFactory實際上是一個異步加載的cache。
我們看一下定義
// org.apache.pulsar.common.naming.NamespaceBundleFactory private final AsyncLoadingCache<NamespaceName, NamespaceBundles> bundlesCache; // 構造函數里面 public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { // ..... this.bundlesCache = Caffeine.newBuilder() .recordStats() // 記錄metric .buildAsync( // 加載cache 的邏輯 (NamespaceName namespace, Executor executor) -> { String path = AdminResource.joinPath(LOCAL_POLICIES_ROOT, namespace.toString()); .... CompletableFuture<NamespaceBundles> future = new CompletableFuture<>(); // Read the static bundle data from the policies pulsar .getLocalZkCacheService() // 獲取LocalZooKeeperCacheService .policiesCache() .getWithStatAsync(path) .thenAccept(result -> { // 這里實際是去找有沒有單獨為這個namespace配置bundle數量 BundlesData bundlesData = result.map(Entry::getKey).map(p -> p.bundles).orElse(null); // 通過namespace拿到namespaceBundle NamespaceBundles namespaceBundles = getBundles( namespace, bundlesData, result.map(Entry::getValue).map(s -> s.getVersion()).orElse(-1)); .... future.complete(namespaceBundles); }).exceptionally(ex -> { future.completeExceptionally(ex); return null; }); return future; }); // ..... }
這里簡單說一下NamespaceBundles 這個類,這個類會保存這個Namespace的所有NamespaceBundle,提供一個聚合的視圖。
這個類表示一個hash環,這個環按照配置的分片個數,會被分成幾個片段,
每個broker會按照一定算法來確定這個環上的哪一部分屬于他自己。
topic也會按照一定的算法分配到這個hash環上。
這樣broker就能確定自己負責哪些topic。
就可以返回lookup請求了,這個流程也會觸發topic的加載流程。
這個函數就是確定這個topic屬于哪個NamespaceBundle
// 映射topic到hash環上的一段, 這一段就被NamespaceBundle 標識 public NamespaceBundle findBundle(TopicName topicName) { checkArgument(this.nsname.equals(topicName.getNamespaceObject())); long hashCode = factory.getLongHashCode(topicName.toString()); NamespaceBundle bundle = getBundle(hashCode); if (topicName.getDomain().equals(TopicDomain.non_persistent)) { bundle.setHasNonPersistentTopic(true); } return bundle; }
到這一步我們就能確定這個namespace的信息了,namespce被分為多少個bundle。
而且可以確定這個topic屬于哪個namespacebundle。
下一步是根據namespaceBundle查找負責的broker。
到這里是根據namespacebundle 確定broker
// 這個記錄的是一個broker的元數據信息 public class NamespaceEphemeralData { private String nativeUrl; private String nativeUrlTls; private String httpUrl; private String httpUrlTls; private boolean disabled; private Map<String, AdvertisedListener> advertisedListeners; } private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl( NamespaceBundle bundle, LookupOptions options) { ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap; return targetMap.computeIfAbsent(bundle, (k) -> { CompletableFuture<Optional<LookupResult>> future = new CompletableFuture<>(); // First check if we or someone else already owns the bundle ownershipCache.getOwnerAsync(bundle) .thenAccept(nsData -> { // nsData : Optional<NamespaceEphemeralData> if (!nsData.isPresent()) { // 如果沒找到這個信息 if (options.isReadOnly()) { // Do not attempt to acquire ownership future.complete(Optional.empty()); } else { // 目前還沒有人負責這個bundle 嘗試查找這個bundle的owner pulsar.getExecutor().execute(() -> { searchForCandidateBroker(bundle, future, options); }); } } else if (nsData.get().isDisabled()) { // namespce 正在unload future.completeExceptionally( new IllegalStateException(String.format("Namespace bundle %s is being unloaded", bundle))); } else { // 到這里是找到了的邏輯,直接拼接正常的response就行了 ... // find the target future.complete(Optional.of(new LookupResult(nsData.get()))); } }).exceptionally(exception -> { ... }); // 這里實際上是使用這個targetMap來做一個鎖的結構避免多次加載。 // https://github.com/apache/pulsar/pull/1527 future.whenComplete((r, t) -> pulsar.getExecutor().execute( () -> targetMap.remove(bundle) )); return future; }); }
這樣如果cache中存在這個topic的owner信息,就可以直接返回。
到此,相信大家對“TopicLookup請求處理方法是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。