中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Transactional topology怎么使用

發布時間:2021-12-22 17:20:20 來源:億速云 閱讀:150 作者:iii 欄目:云計算

本篇內容介紹了“Transactional topology怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

你可以通過使用TransactionalTopologyBuilder來創建transactional topology. 下面就是一個transactional topology的定義, 它的作用是計算輸入流里面的tuple的個數。這段代碼來自storm-starter里面的TransactionalGlobalCount。

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);

builder.setBolt("partial-count", new BatchCount(), 5)

        .shuffleGrouping("spout");

builder.setBolt("sum", new UpdateGlobalCount())

        .globalGrouping("partial-count");

TransactionalTopologyBuilder構造器中接受如下的參數:

?一個transaction topology的id

?spout在整個topology里面的id。

?一個transactional spout。

?一個可選的這個transactional spout的并行度。

topology的id是用來在zookeeper里面保存這個topology的當前進度狀態的,所以如果你重啟這個topology, 它可以接著前面的進度繼續執行。

一個transaction topology里面有一個唯一的TransactionalSpout, 這個spout是通過TransactionalTopologyBuilder的構造函數來指定的。在這個例子里面,MemoryTransactionalSpout被用來從一個內存變量里面讀取數據(DATA)。第二個參數指定spout發送的tuple的字段, 第三個參數指定每個batch的最大tuple數量。關于如何自定義TransactionalSpout我們會在后面介紹。

現在說說 bolts。這個topology并行地計算tuple的總數量。第一個bolt:BatchBolt,隨機地把輸入tuple分給各個task,然后各個task各自統計局部數量。第二個bolt:UpdateGlobalCount, 用全局grouping來匯總這個batch中tuple的數量,然后再更新到數據庫里面的全局數量。

下面是BatchCount的定義:

public static class BatchCount extends BaseBatchBolt {

    Object _id;

    BatchOutputCollector _collector;

    int _count = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {

        _collector = collector;

        _id = id;

    }

    @Override

    public void execute(Tuple tuple) {

        _count++;

    }

    @Override

    public void finishBatch() {

        _collector.emit(new Values(_id, _count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "count"));

    }

}

storm會為每個正在處理的batch創建一個BatchCount對象,這個BatchCount是運行在BatchBoltExecutor里面的。而BatchBoltExecutor負責創建以及清理這個對象的實例。

BatchCount對象的prepare方法接收如下參數:

?Storm config

?Topology context

?Output collector

?這個batch的id (txid),在Transactional Topology中, 這個id則是一個TransactionAttempt對象。

這個batch bolt的抽象在DRPC里面也可以用, 只是txid的類型不一樣而已。實際上,BatchBolt可以接收一個txid類型的參數,所以如果你只是想在transactioinal topology里面使用這個BatchBolt,你可以去繼承BaseTransactionalBolt類,如下定義:

public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {

}

在transaction topology里面發射的所有的tuple都必須以TransactionAttempt作為第一個field, 然后storm可以根據這個field來判斷哪些tuple屬于一個batch。所以你在發射tuple的時候需要滿足這個條件。

TransactionAttempt包含兩個值: 一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對于每個batch是唯一的,而且不管這個batch 被replay多少次都是一樣的。attempt id是對于每個batch唯一的一個id, 但是對于同一個batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把attempt id理解成replay-times, storm利用這個id來區別一個batch發射的tuple的不同版本。

transaction id對于每個batch加一, 所以第一個batch的transaction id是”1″, 第二個batch是”2″,依次類推。

每收到一個batch中的tuple,execute方法便被調用一次。每次當該方法被調用時,你應該把這個batch里面的狀態保持在一個本地變量里面。對于這個例子來說, 它在execute方法里面遞增tuple的個數。

最后, 當這個bolt接收到某個batch的所有的tuple之后, finishBatch方法會被調用。這個例子里面的BatchCount類會在這個時候發射它的局部數量到它的輸出流里面去。

下面是UpdateGlobalCount類的定義:

public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {

    TransactionAttempt _attempt;

    BatchOutputCollector _collector;

    int _sum = 0;

    @Override

    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {

        _collector = collector;

        _attempt = attempt;

    }

    @Override

    public void execute(Tuple tuple) {

        _sum+=tuple.getInteger(1);

    }

    @Override

    public void finishBatch() {

        Value val = DATABASE.get(GLOBAL_COUNT_KEY);

        Value newval;

        if(val == null || !val.txid.equals(_attempt.getTransactionId())) {

            newval = new Value();

            newval.txid = _attempt.getTransactionId();

            if(val==null) {

                newval.count = _sum;

            } else {

                newval.count = _sum + val.count;

            }

            DATABASE.put(GLOBAL_COUNT_KEY, newval);

        } else {

            newval = val;

        }

        _collector.emit(new Values(_attempt, newval.count));

    }

    @Override

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("id", "sum"));

    }

}

UpdateGlobalCount是Transactional Topologies相關的類,所以它繼承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累積這個batch的計數, 比較有趣的是finishBatch方法。

首先, 注意這個bolt實現了ICommitter接口,這告訴storm要在這個事務的commit階段調用finishBatch方法,所以對于finishBatch的調用會保證強順序性(順序就是transaction id的升序),另一方面execute方法在processing或者commit階段都可以執行。另外一種把bolt標識為commiter的方法是調用TransactionalTopologyBuilder的setCommiterBolt來添加Bolt(而不是setBolt)。

UpdateGlobalCount里面finishBatch方法的邏輯是首先從數據庫中獲取當前的值,并且把數據庫里面的transaction id與當前這個batch的transaction id進行比較。如果他們一樣, 那么忽略這個batch。否則把這個batch的結果加到總結果里面去,并且更新數據庫。

“Transactional topology怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

西昌市| 青海省| 中江县| 平湖市| 通州市| 游戏| 澄城县| 白银市| 策勒县| 武宣县| 公主岭市| 元朗区| 泗阳县| 巍山| 建平县| 微山县| 寻甸| 肃北| 汽车| 化德县| 虎林市| 卫辉市| 岑溪市| 清镇市| 尼木县| 峨眉山市| 虞城县| 胶州市| 准格尔旗| 太湖县| 嘉荫县| 专栏| 吉木乃县| 建瓯市| 佛冈县| 三穗县| 永定县| 弥渡县| 义乌市| 揭阳市| 元谋县|