中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm如何和Kafka進行整合

發布時間:2021-11-24 15:48:36 來源:億速云 閱讀:181 作者:柒染 欄目:云計算

這篇文章將為大家詳細講解有關Storm如何和Kafka進行整合,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

 對于Storm 如何和Kafka進行整合

package com.mixbox.storm.kafka;

import backtype.storm.Config;
import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId;

import java.util.*;

/**
 * @author Yin Shuai
 */

public class KafkaSpout extends BaseRichSpout {

	public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);

	/**
	 * 內部類,Message和Offset的偏移量對象
	 * 
	 * @author Yin Shuai
	 */

	public static class MessageAndRealOffset {
		public Message msg;
		public long offset;

		public MessageAndRealOffset(Message msg, long offset) {
			this.msg = msg;
			this.offset = offset;
		}
	}

	/**
	 * 發射的枚舉類
	 * @author Yin Shuai
	 */
	static enum EmitState {
		EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED
	}

	String _uuid = UUID.randomUUID().toString();
	
	SpoutConfig _spoutConfig;
	
	SpoutOutputCollector _collector;

	// 分區的協調器,getMyManagedPartitions 拿到我所管理的分區
	PartitionCoordinator _coordinator;

	// 動態的分區鏈接:保存到kafka各個節點的連接,以及負責的topic的partition號碼
	DynamicPartitionConnections _connections;

	// 提供了從zookeeper讀寫kafka 消費者信息的功能
	ZkState _state;

	// 上次更新的毫秒數
	long _lastUpdateMs = 0;

	// 當前的分區
	int _currPartitionIndex = 0;

	public KafkaSpout(SpoutConfig spoutConf) {
		_spoutConfig = spoutConf;
	}

	@SuppressWarnings("unchecked")
	@Override
	public void open(Map conf, final TopologyContext context,
			final SpoutOutputCollector collector) {
		_collector = collector;

		List<String> zkServers = _spoutConfig.zkServers;

		// 初始化的時候如果zkServers 為空,那么初始化 默認的配置Zookeeper
		if (zkServers == null) {

			zkServers = new ArrayList<String>() {

				{
					add("192.168.50.144");
					add("192.168.50.169");
					add("192.168.50.207");
				}
			};

			// zkServers =
			// (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS);
			System.out.println(" 使用的是Storm默認配置的Zookeeper List : " + zkServers);

		}

		Integer zkPort = _spoutConfig.zkPort;

		// 在這里我們也同時 來檢查zookeeper的端口是否為空
		if (zkPort == null) {

			zkPort = 2181;
			// zkPort = ((Number)
			// conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
		}

		Map stateConf = new HashMap(conf);

		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
		stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);

		// 通過保存的配置文件,我們持有了一個zookeeper的state,支持節點內容的創建和刪除
		_state = new ZkState(stateConf);

		// 對于連接的維護
		_connections = new DynamicPartitionConnections(_spoutConfig,
				KafkaUtils.makeBrokerReader(conf, _spoutConfig));

		// using TransactionalState like this is a hack
		// 拿到總共的任務次數

		int totalTasks = context
				.getComponentTasks(context.getThisComponentId()).size();

		// 判斷當前的主機是否是靜態的statichost
		if (_spoutConfig.hosts instanceof StaticHosts) {
			_coordinator = new StaticCoordinator(_connections, conf,
					_spoutConfig, _state, context.getThisTaskIndex(),
					totalTasks, _uuid);

			// 當你拿到的spoutConfig是zkhost的時候
		} else {
			_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig,
					_state, context.getThisTaskIndex(), totalTasks, _uuid);
		}

		context.registerMetric("kafkaOffset", new IMetric() {
			KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(
					_spoutConfig.topic, _connections);

			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Set<Partition> latestPartitions = new HashSet();
				for (PartitionManager pm : pms) {
					latestPartitions.add(pm.getPartition());
				}
				_kafkaOffsetMetric.refreshPartitions(latestPartitions);
				for (PartitionManager pm : pms) {
					_kafkaOffsetMetric.setLatestEmittedOffset(
							pm.getPartition(), pm.lastCompletedOffset());
				}
				return _kafkaOffsetMetric.getValueAndReset();
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);

		context.registerMetric("kafkaPartition", new IMetric() {
			@Override
			public Object getValueAndReset() {
				List<PartitionManager> pms = _coordinator
						.getMyManagedPartitions();
				Map concatMetricsDataMaps = new HashMap();
				for (PartitionManager pm : pms) {
					concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
				}
				return concatMetricsDataMaps;
			}
		}, _spoutConfig.metricsTimeBucketSizeInSecs);
	}

	@Override
	public void close() {
		_state.close();
	}

	@Override
	public void nextTuple() {
		// Storm-spout 是從kafka 消費數據,把 kafka 的 consumer
		// 當成是一個spout,并且向其他的bolt的發送數據

		// 拿到當前我管理的這些PartitionsManager
		List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
		for (int i = 0; i < managers.size(); i++) {

			// 對于每一個分區的 PartitionManager

			// in case the number of managers decreased
			// 當前的分區

			_currPartitionIndex = _currPartitionIndex % managers.size();

			// 拿到當前的分區,并且發送,這里把SpoutOutputCollector傳遞進去了,由他發射元祖
			EmitState state = managers.get(_currPartitionIndex)
					.next(_collector);

			// 如果發送狀態為:發送-還有剩余
			if (state != EmitState.EMITTED_MORE_LEFT) {
				_currPartitionIndex = (_currPartitionIndex + 1)
						% managers.size();
			}

			// 如果發送的狀態為: 發送-沒有剩余
			if (state != EmitState.NO_EMITTED) {
				break;
			}
		}

		long now = System.currentTimeMillis();
		if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
			commit();
		}
	}

	@Override
	public void ack(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.ack(id.offset);
		}
	}

	@Override
	public void fail(Object msgId) {
		KafkaMessageId id = (KafkaMessageId) msgId;
		PartitionManager m = _coordinator.getManager(id.partition);
		if (m != null) {
			m.fail(id.offset);
		}
	}

	@Override
	public void deactivate() {
		// 停止工作
		commit();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		System.out.println(_spoutConfig.scheme.getOutputFields());
		declarer.declare(_spoutConfig.scheme.getOutputFields());
	}

	private void commit() {
		_lastUpdateMs = System.currentTimeMillis();
		for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
			manager.commit();
		}
	}

}

       在粗淺的代碼閱讀之后,在這里進行詳細的分析:

      1  KafkaSpout之中持有了一個 MessageAndRealOffset 的內部類

public static class MessageAndRealOffset
{
    public Message msg;
    
    public long offset;
    
    public MessageAndRealOffset(Message msg,long offset)
    {
        this.msg = msg;
        this.offset = offset;
    }
}

    2 在Spout之中我們還持有了一個PartitionCoordinator的分區協調器,默認的情況我們實例化的對象

是ZKCoordinator

    

關于Storm如何和Kafka進行整合就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿巴嘎旗| 呈贡县| 清徐县| 张家界市| 金溪县| 吐鲁番市| 汶川县| 福州市| 新乐市| 光山县| 孝义市| 江门市| 澄城县| 宜兰县| 轮台县| 定襄县| 明水县| 东至县| 张家港市| 河源市| 璧山县| 吴忠市| 横山县| 阿拉善盟| 贵溪市| 津市市| 石狮市| 霍林郭勒市| 江川县| 湖州市| 新源县| 济宁市| 万载县| 江孜县| 临海市| 扶绥县| 杭锦旗| 阳春市| 武威市| 来安县| 柳江县|