中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Disruptor、Kafka、Netty如何整合

發布時間:2021-12-08 15:48:09 來源:億速云 閱讀:770 作者:小新 欄目:云計算

這篇文章主要介紹了Disruptor、Kafka、Netty如何整合,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

NETTY應用網關

整個網關的核心是一個netty server,各個應用程序(包括web server,手機app等)連到這個netty server上請求數據;關于數據來源,需要監聽多個kafka topic(而且這里的topic是可變的,也就是說需要kafka consumer的動態開始和停止),之后需要把所有這些topic的數據整合在一起,通過channel發送給客戶端應用程序。

數據流圖

Disruptor、Kafka、Netty如何整合

源碼

下面把大部分的代碼貼出來,有需要的同學可以參考。會對關鍵的技術點進行說明,偏業務部分大家自行忽略吧。

main函數

啟動disruptor;監聽一個固定的topic,把獲取到的msg,交給ConsumerProcessorGroup來完成kafka consumer的創建和停止。

public static void main(String[] args) {
        DisruptorHelper.getInstance().start();
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("uavlst"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            ConsumerRecord<String, String> lastRecord = null;
            for (ConsumerRecord<String, String> record : records)
                lastRecord = record;

            if (lastRecord != null){
                ConsumerProcessorGroup.getInstance().recieveNewUavLst(lastRecord.value());
            }
        }
    }

DisruptorHelper

DisruptorHelper是一個單例,主要是包含了一個disruptor 對象,在new這個對象的時候,用到了ProducerType.MULTI和new BlockingWaitStrategy(),其中前者意味著我們需要多個producer共同來工作,后者其實是默認的producer的等待策略,后續根據實際情況進行調整。

public class DisruptorHelper {
    private static DisruptorHelper instance = null;

    public static DisruptorHelper getInstance() {
        if (instance == null) {
            instance = new DisruptorHelper();
        }
        return instance;
    }

    private final int BUFFER_SIZE = 1024;
    private Disruptor<MsgEvent> disruptor = null;

    private DisruptorHelper() {
        MsgEventHandler eventHandler = new MsgEventHandler();
        disruptor = new Disruptor(new MsgEventFactory(), BUFFER_SIZE, new ConsumerThreadFactory(), ProducerType.MULTI, new BlockingWaitStrategy());
        disruptor.handleEventsWith(eventHandler);
    }

    public void start() {
        disruptor.start();
    }

    public void shutdown() {
        disruptor.shutdown();
    }

    public void produce(ConsumerRecord<String, String> record) {
        RingBuffer<MsgEvent> ringBuffer = disruptor.getRingBuffer();
        long sequence = ringBuffer.next();
        try {
            ringBuffer.get(sequence).setRecord(record);
        } finally {
            ringBuffer.publish(sequence);
        }
    }
}

ConsumerProcessorGroup

ConsumerProcessorGroup是一個單例,當中包含一個fixedThreadPool,動態的啟動線程來進行kafka topic的消費。

public class ConsumerProcessorGroup {
    private static ConsumerProcessorGroup instance = null;

    public static ConsumerProcessorGroup getInstance(){
        if (instance == null){
            instance = new ConsumerProcessorGroup();
        }
        return instance;
    }

    private ConsumerProcessorGroup() {

    }

    private ExecutorService fixedThreadPool = Executors.newFixedThreadPool(20);

    public List<String> uavIDLst = new Vector<String>();

    public void recieveNewUavLst(String uavIDs){
        List<String> newUavIDs = Arrays.asList(uavIDs.split(","));
        for (String uavID : newUavIDs){
            if (!uavIDLst.contains(uavID)){
                fixedThreadPool.execute(new ConsumerThread(uavID));
                uavIDLst.add(uavID);
            }
        }
        List<String> tmpLstForDel = new ArrayList<String>();
        for (String uavID : uavIDLst){
            if (!newUavIDs.contains(uavID)){
                tmpLstForDel.add(uavID);
            }
        }
        uavIDLst.removeAll(tmpLstForDel);
    }
}

ConsumerThread

對kafka topic進行消費,通過DisruptorHelper將獲取的record寫入disruptor的ring buffer當中。

public class ConsumerThread implements Runnable {
    private String uavID;

    public ConsumerThread(String uavID) {
        this.uavID = uavID;
    }

    public void run() {
        Properties props = ConsumerProps.getConsumerProps();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(uavID));
        System.out.println(uavID + " consumer started! Current thread id is " + Thread.currentThread().getId());
        while (ConsumerProcessorGroup.getInstance().uavIDLst.contains(uavID)) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                DisruptorHelper.getInstance().produce(record);
            }
        }
        System.out.println(uavID + " consumer finished! Current thread id is " + Thread.currentThread().getId());
    }
}

MsgEventHandler

Disruptor的消費者,依次從Ring Buffer當中讀取數據并執行相應的處理。

public class MsgEventHandler implements EventHandler<MsgEvent> {
    private Map<Integer, String> converterMap;

    public void onEvent(MsgEvent event, long sequence, boolean endOfBatch) throws Exception {
        ConsumerRecord<String, String> record = event.getRecord();
        System.out.printf("topic = %s, part = %d, offset = %d, key = %s, value = %s \n\r", record.topic(), record.partition(), record.offset(), record.key(), record.value());
    }
}

感謝你能夠認真閱讀完這篇文章,希望小編分享的“Disruptor、Kafka、Netty如何整合”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

卢氏县| 襄城县| 卫辉市| 教育| 兴海县| 东兰县| 平原县| 新昌县| 曲阳县| 安康市| 松原市| 保靖县| 怀来县| 湖口县| 乐至县| 岑巩县| 马边| 额济纳旗| 乐安县| 浪卡子县| 镶黄旗| 化德县| 卢龙县| 岳西县| 荥阳市| 邓州市| 文成县| 右玉县| 商都县| 尉犁县| 筠连县| 鲁山县| 罗平县| 凤山县| 新化县| 腾冲县| 蓬莱市| 贡嘎县| 石台县| 鹰潭市| 房山区|