中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm容錯機制Acker詳解和實戰案例

發布時間:2020-08-20 23:30:21 來源:網絡 閱讀:3224 作者:Cloudy講師 欄目:大數據

Storm中有個特殊的Executor叫acker,他們負責跟蹤spout發出的每一個Tuple的Tuple樹。當acker發現一個Tuple樹已經處理完成了,它會告訴框架回調Spout的ack(),否則回調Spout的fail()。


Acker的跟蹤算法是Storm的主要突破之一,對任意大的一個Tuple樹,它只需要恒定的20字節就可以進行跟蹤。


我們期望的是,如果某個Tuple被Bolt執行失敗了,則Spout端可以重新發送該Tuple。但很遺憾的是,框架不會自動重新發送,需要我們自己手工編碼實現。后續給大家實戰案例!


什么是Tuple樹?

Storm容錯機制Acker詳解和實戰案例


Storm容錯機制Acker詳解和實戰案例

Storm容錯機制Acker詳解和實戰案例


Spout類代碼如下:

package les19.Ack_Fail;


import java.io.BufferedReader;

import java.io.FileInputStream;

import java.io.InputStreamReader;

import java.util.Map;

import java.util.UUID;

import java.util.concurrent.ConcurrentHashMap;


import org.apache.storm.spout.SpoutOutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichSpout;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Values;


public class AckSpout implements IRichSpout{


/**

*/

private static final long serialVersionUID = 1L;


FileInputStream fis;

InputStreamReader isr;

BufferedReader br;

    private ConcurrentHashMap<Object, Values> _pending;//線程安全的Map,存儲emit過的tuple

    private ConcurrentHashMap<Object, Integer> fail_pending;//存儲失敗的tuple和其失敗次數

SpoutOutputCollector collector = null;

String str = null;


@Override

public void nextTuple() {

try {

while ((str = this.br.readLine()) != null) {

// 過濾動作

UUID msgId = UUID.randomUUID();

String arr[] = str.split("\t");

String date = arr[2].substring(0, 10);

String orderAmt = arr[1];

Values val = new Values(date,orderAmt);

       this._pending.put(msgId, val);

       

       collector.emit(val, msgId);

       

       System.out.println("_pending.size()="+_pending.size());

       

}

} catch (Exception e) {

// TODO: handle exception

}

}

@Override

public void close() {

// TODO Auto-generated method stub

try {

br.close();

isr.close();

fis.close();

} catch (Exception e) {

// TODO: handle exception

e.printStackTrace();

}

}

@Override

//初始化函數

public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {

try {

this.collector = collector;

this.fis = new FileInputStream("order.log");

this.isr = new InputStreamReader(fis, "UTF-8");

this.br = new BufferedReader(isr);

       _pending = new ConcurrentHashMap<Object, Values>();

       fail_pending = new ConcurrentHashMap<Object, Integer>();

} catch (Exception e) {

e.printStackTrace();

}

// TODO Auto-generated method stub

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub

declarer.declare(new Fields("date","orderAmt"));

}


@Override

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub

return null;

}

@Override

public void ack(Object msgId) {

// TODO Auto-generated method stub

System.out.println("_pending size 共有:"+_pending.size());

System.out.println("spout ack:"+msgId.toString()+"---"+msgId.getClass());

this._pending.remove(msgId);

System.out.println("_pending size 剩余:"+_pending.size());

}


@Override

public void activate() {

// TODO Auto-generated method stub

}




@Override

public void deactivate() {

// TODO Auto-generated method stub

}


@Override

public void fail(Object msgId) {

// TODO Auto-generated method stub

System.out.println("spout fail:"+msgId.toString());

Integer fail_count = fail_pending.get(msgId);//獲取該Tuple失敗的次數

if (fail_count == null) {

fail_count = 0;

}

fail_count ++ ;

if (fail_count>=3) {

//重試次數已滿,不再進行重新emit

fail_pending.remove(msgId);

}else {

//記錄該tuple失敗次數

fail_pending.put(msgId, fail_count);

//重發

this.collector.emit(this._pending.get(msgId), msgId);

}

}


}



Bolt如下:

package les19.Ack_Fail;


import java.util.Map;


import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.IRichBolt;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.tuple.Fields;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;


public class AckBolt implements IRichBolt {


/**

*/

private static final long serialVersionUID = 1L;


OutputCollector collector = null;

TopologyContext context = null;

@Override

public void cleanup() {

// TODO Auto-generated method stub


}

int num = 0;

String url = null;

String session_id = null;

String date = null;

String province_id = null;

@Override

public void execute(Tuple input) {

try {

date = input.getStringByField("date") ;

Double orderAmt = Double.parseDouble(input.getStringByField("orderAmt"));

collector.emit(input,new Values(date,orderAmt));//注意參數,第一個參數是Tuple本身

collector.ack(input);

//     Thread.sleep(300);

} catch (Exception e) {

collector.fail(input);  

e.printStackTrace();

}

}

//初始化,對應spout的open函數

@Override

public void prepare(Map stormConf, TopologyContext context,

OutputCollector collector) {

// TODO Auto-generated method 

this.context = context ;

this.collector = collector ;

}


@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// TODO Auto-generated method stub


declarer.declare(new Fields("date","orderAmt")) ;

}


@Override

public Map<String, Object> getComponentConfiguration() {

// TODO Auto-generated method stub

return null;

}


}


TOPO類如下:

package les19.Ack_Fail;


import org.apache.storm.Config;

import org.apache.storm.LocalCluster;

import org.apache.storm.StormSubmitter;

import org.apache.storm.topology.TopologyBuilder;


public class Ack_FailTopo {


/**

* @param args

*/

public static void main(String[] args) {

// TODO Auto-generated method stub


TopologyBuilder builder = new TopologyBuilder();


builder.setSpout("spout", new AckSpout(), 1);    

builder.setBolt("bolt", new AckBolt(), 1).shuffleGrouping("spout");

Config conf = new Config() ;

//conf.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);

conf.setDebug(false);

if (args.length > 0) {

            try {

StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

} catch (Exception e) {

e.printStackTrace();

}

}else {

LocalCluster localCluster = new LocalCluster();

localCluster.submitTopology("mytopology", conf, builder.createTopology());

}


}


}


想了解更多,見我的51CTO上的Storm視頻教程http://edu.51cto.com/course/course_id-9041.html

,本節來自第18-19講。



向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

临沂市| 鄢陵县| 资兴市| 漳浦县| 罗江县| 乐安县| 阳泉市| 彭阳县| 西峡县| 高陵县| 平和县| 绿春县| 石家庄市| 宁陕县| 辽源市| 徐闻县| 军事| 方城县| 宁远县| 巴楚县| 米泉市| 沈阳市| 磐石市| 合肥市| 利津县| 孟州市| 义乌市| 闽侯县| 达拉特旗| 交口县| 乳源| 乌鲁木齐市| 松滋市| 元江| 漯河市| 洪泽县| 日土县| 沅江市| 丰宁| 湖南省| 眉山市|