中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm-kafka中如何實現一個對于kafkaBroker動態讀取的Class

發布時間:2021-12-13 17:15:41 來源:億速云 閱讀:134 作者:小新 欄目:云計算

小編給大家分享一下Storm-kafka中如何實現一個對于kafkaBroker動態讀取的Class,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

實現一個對于kafkaBroker 動態讀取的Class - DynamicBrokersReader

DynamicBrokersReader

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.utils.Utils;

import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.retry.RetryNTimes;
import org.json.simple.JSONValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;

/**
 * 動態的Broker讀 我們維護了有一個與zk之間的連接,提供了獲取指定topic的每一個partition正在活動著的leader所對應的broker
 * 這樣你有能力知道,當前的這些topic,哪一些broker是活動的 * @author Yin Shuai
 */

public class DynamicBrokersReader {

	public static final Logger LOG = LoggerFactory
			.getLogger(DynamicBrokersReader.class);

	// 對于Client CuratorFrameWork的封裝
	private CuratorFramework _curator;

	// 在Zk上注冊的位置
	private String _zkPath;

	// 指定的_topic
	private String _topic;

	public DynamicBrokersReader(Map conf, String zkStr, String zkPath,
			String topic) {
		_zkPath = zkPath;
		_topic = topic;
		try {
			_curator = CuratorFrameworkFactory
					.newClient(
							zkStr,
							Utils.getInt(conf
									.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
							15000,
							new RetryNTimes(
									Utils.getInt(conf
											.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
									Utils.getInt(conf
											.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		_curator.start();
	}

	public DynamicBrokersReader(String zkPath) {
		this._zkPath = zkPath;
	}

	/**
	 * 確定指定topic下,每一個partition的leader,所對應的 主機和端口, 并將它們存入到全部分區信息中
	 * 
	 */
	public GlobalPartitionInformation getBrokerInfo() {
		GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
		try {

			// 拿到當前的分區數目
			int numPartitionsForTopic = getNumPartitions();

			/**
			 * /brokers/ids
			 */
			String brokerInfoPath = brokerPath();

			// 默認的我們的分區數目就只有 0, 1 兩個
			for (int partition = 0; partition < numPartitionsForTopic; partition++) {

				// 這里請主要參考分區和領導者的關系
				int leader = getLeaderFor(partition);

				// 拿到領導者以后的zookeeper路徑
				String leaderPath = brokerInfoPath + "/" + leader;

				try {

					byte[] brokerData = _curator.getData().forPath(leaderPath);

					/**
					 * 在這里, 我們拿到的brokerData為:
					 * {"jmx_port":-1,"timestamp":"1403076810435"
					 * ,"host":"192.168.50.207","version":1,"port":9092} 注意
					 * 這里是字節數組開始轉json
					 */
					Broker hp = getBrokerHost(brokerData);

					/**
					 * 記錄好 每一個分區 partition 所對應的 Broker
					 */
					globalPartitionInformation.addPartition(partition, hp);

				} catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
					LOG.error("Node {} does not exist ", leaderPath);
				}
			}
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
		LOG.info("Read partition info from zookeeper: "
				+ globalPartitionInformation);
		return globalPartitionInformation;
	}

	/**
	 * @return 拿到指定topic下的分區數目
	 */
	private int getNumPartitions() {
		try {
			String topicBrokersPath = partitionPath();
			List<String> children = _curator.getChildren().forPath(
					topicBrokersPath);
			return children.size();
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * @return 拿到的topic在zookeeper注冊的分區地址
	 *         brokers/topics/storm-sentence/partitions
	 */
	public String partitionPath() {
		return _zkPath + "/topics/" + _topic + "/partitions";
	}

	/**
	 *  持有的是Broker節點的id號碼,這個id號是在配置的過程中為每一個Broker分配的
	 * @return   /brokers/ids
	 */
	public String brokerPath() {
		return _zkPath + "/ids";
	}

	/**
	 * get /brokers/topics/distributedTopic/partitions/1/state {
	 * "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1,
	 * "version":1 }
	 * 
	 * 說明一下,在kafka之中,每一個分區都會有一個Leader,有0個或者多個的followers, 一個leader會處理這個分區的所有請求
	 * @param partition
	 * @return
	 */
	private int getLeaderFor(long partition) {
		try {
			String topicBrokersPath = partitionPath();
			byte[] hostPortData = _curator.getData().forPath(
					topicBrokersPath + "/" + partition + "/state");
			@SuppressWarnings("unchecked")
			Map<Object, Object> value = (Map<Object, Object>) JSONValue
					.parse(new String(hostPortData, "UTF-8"));
			Integer leader = ((Number) value.get("leader")).intValue();
			return leader;
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	public void close() {
		_curator.close();
	}

	/**
	 * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0 {
	 * "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
	 * 
	 * 
	 * @param contents
	 * @return
	 */
	private Broker getBrokerHost(byte[] contents) {
		try {
			@SuppressWarnings("unchecked")
			Map<Object, Object> value = (Map<Object, Object>) JSONValue
					.parse(new String(contents, "UTF-8"));
			String host = (String) value.get("host");
			Integer port = ((Long) value.get("port")).intValue();
			return new Broker(host, port);
		} catch (UnsupportedEncodingException e) {
			throw new RuntimeException(e);
		}
	}

}

對于以上代碼須知:

1 : 我們持有了一個ZkPath , 在Storm-kafka的class之中我們默認的是/brokers 

2 : _topic ,  目前我們是針對的是Topic, 也就是說我們的partition,leader都是針對于單個Topic的

3:   

1 int numPartitionsForTopic = getNumPartitions();

       針對與一個Topic,首先我們要取當前的分區數,一般的情況,我們在kafka之中默認的分區數為2

2 String brokerInfoPath = brokerPath();

       拿到 /brokers/ids 的分區號

3:	for (int partition = 0; partition < numPartitionsForTopic; partition++) {

        依次的遍歷每一個分區

4:int leader = getLeaderFor(partition);String leaderPath = brokerInfoPath + "/" + leader;byte[] brokerData = _curator.getData().forPath(leaderPath);

       再通過分區拿到領導者,以及領導者的路徑,最后拿到領導者的數據:

       我們舉一個小例子

        * 在這里, 我們拿到的brokerData為:

* {"jmx_port":-1,"timestamp":"1403076810435"

* ,"host":"192.168.50.207","version":1,"port":9092} 

4:Broker hp = getBrokerHost(brokerData);

          拿到某一個Topic自己的分區在kafka所對應的Broker,并且其封裝到 globalPartitionInformation

5 globalPartitionInformation.addPartition(partition, hp);

       GlobalPartitionInformaton底層維護了一個HashMap

簡單的來說:DynamicBrokersReader 針對某一個Topic維護了  每一個分區 partition 所對應的 Broker

以上是“Storm-kafka中如何實現一個對于kafkaBroker動態讀取的Class”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西华县| 莎车县| 邓州市| 阳泉市| 梨树县| 新营市| 岳西县| 根河市| 舞钢市| 祁阳县| 金平| 钟山县| 盐池县| 西乌珠穆沁旗| 伊吾县| 遵化市| 延边| 铅山县| 兴仁县| 和硕县| 定兴县| 房产| 吴桥县| 佛坪县| 九寨沟县| 南华县| 财经| 万山特区| 博爱县| 祁阳县| 东光县| 朝阳区| 凉山| 长武县| 泸西县| 通城县| 蓬溪县| 荆门市| 建昌县| 嵩明县| 横山县|