中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何進行storm1.1.3與kafka1.0.0整合

發布時間:2021-12-15 16:10:06 來源:億速云 閱讀:131 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關如何進行storm1.1.3與kafka1.0.0整合,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

package hgs.core.sk;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
@SuppressWarnings("deprecation")
public class StormKafkaMainTest {
	
	public static void main(String[] args) {
		TopologyBuilder builder = new TopologyBuilder();
		//zookeeper鏈接地址
		BrokerHosts hosts = new ZkHosts("bigdata01:2181,bigdata02:2181,bigdata03:2181");
		//KafkaSpout需要一個config,參數代表的意義1:zookeeper鏈接,2:消費kafka的topic,3,4:記錄消費offset的zookeeper地址 ,這里會保存在 zookeeper
		//集群的/test7/consume下面
		SpoutConfig sconfig = new SpoutConfig(hosts, "test7", "/test7", "consume");
		//消費的時候忽略offset從頭開始消費,這里可以注釋掉,因為消費的offset在zookeeper中可以找到
		sconfig.ignoreZkOffsets=true;
		//sconfig.scheme = new SchemeAsMultiScheme( new StringScheme() );
		builder.setSpout("kafkaspout", new KafkaSpout(sconfig), 1);
		builder.setBolt("mybolt1", new MyboltO(), 1).shuffleGrouping("kafkaspout");
		
     	Config config = new Config();
     	config.setNumWorkers(1);
     	try {
			StormSubmitter.submitTopology("storm----kafka--test", config, builder.createTopology());
		} catch (Exception e) {
			e.printStackTrace();
		}
     	
 /*    	LocalCluster cu  = new LocalCluster();
     	cu.submitTopology("test", config, builder.createTopology());*/
	}
}
class  MyboltO extends  BaseRichBolt{
	private static final long serialVersionUID = 1L;
	OutputCollector collector = null;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	public void execute(Tuple input) {
		//這里把消息大一出來,在對應的woker下面的日志可以找到打印的內容
		//因為得到的內容是byte數組,所以需要轉換
		String out = new String((byte[])input.getValue(0));
		System.out.println(out);
		collector.ack(input);
		
	}
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		
	}
	
	
}
pom.xml文件的依賴
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>hgs</groupId>
  <artifactId>core.sk</artifactId>
  <version>1.0.0-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>core.sk</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    
    <dependency>
    	<groupId>org.apache.storm</groupId>
    	<artifactId>storm-kafka</artifactId>
    	<version>1.1.3</version>
	</dependency>
	<dependency>
  		<groupId>org.apache.storm</groupId>
 		 <artifactId>storm-core</artifactId>
  		<version>1.1.3</version>
  		<scope>provided</scope>
	</dependency>
	<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka_2.11</artifactId>
    	<version>1.0.0</version>
    <exclusions>
    		<exclusion>
          		<groupId>org.slf4j</groupId>
          		<artifactId>slf4j-log4j12</artifactId>
        	</exclusion>
        	<exclusion>
            	<groupId>org.apache.zookeeper</groupId>
            	<artifactId>zookeeper</artifactId>
       		</exclusion>
    	</exclusions>
	</dependency>
	
<!-- 	<dependency>
    	<groupId>org.apache.storm</groupId>
    	<artifactId>storm-kafka-monitor</artifactId>
    	<version>1.2.2</version>
	</dependency> -->
<!-- 	<dependency>
    	<groupId>org.apache.kafka</groupId>
    	<artifactId>kafka-clients</artifactId>
    	<version>0.8.2.1</version>
	</dependency> -->
	
	<dependency>
	    <groupId>org.clojure</groupId>
	    <artifactId>clojure</artifactId>
	    <version>1.7.0</version>
	</dependency>
	<!-- 嘗試了很多次 都會有這個錯誤:
	java.lang.NullPointerException at org.apache.storm.kafka.monitor.KafkaOffsetLagUtil.getOffsetLags(KafkaOffsetLagUtil.java:272)
	最后修改為kafka相應的kafka-clients版本后問題得到解決,應該是該出的問題
	-->
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>1.0.0</version>
	</dependency>
	
 </dependencies>
  
  
  
  <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <!-- 我運行這個jar所運行的主類 -->
                            <mainClass>hgs.core.sk.StormKafkaMainTest</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>
                            <!-- 必須是這樣寫 -->
                            jar-with-dependencies
                        </descriptorRef>
                    </descriptorRefs>
                </configuration>
                
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            
             <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

以上就是如何進行storm1.1.3與kafka1.0.0整合,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

济源市| 康平县| 汾西县| 枣阳市| 开化县| 忻城县| 扬州市| 都昌县| 延吉市| 韶山市| 长泰县| 巴青县| 慈利县| 马龙县| 五河县| 武川县| 桦川县| 保德县| 甘肃省| 红河县| 义乌市| 双柏县| 松江区| 中西区| 兴隆县| 泾阳县| 柳河县| 义马市| 武定县| 泸水县| 灵台县| 额济纳旗| 安义县| 苍南县| 全南县| 宁津县| 临湘市| 济阳县| 庆元县| 安乡县| 平定县|