中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Disruptor中怎么實現一個高性能隊列

發布時間:2021-06-21 16:57:40 來源:億速云 閱讀:111 作者:Leah 欄目:大數據

Disruptor中怎么實現一個高性能隊列,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

Disruptor 例子

import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}

object DisruptorTest {

  val disruptor = {
    val factory = new EventFactory[Event] {
      override def newInstance(): Event = Event(-1)
    }

    val threadFactory = new ThreadFactory(){
      override def newThread(r: Runnable): Thread = new Thread(r)
    }
    
    val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE, 
                        new BlockingWaitStrategy())

    disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
    
    disruptor
  }
  
  val translator = new EventTranslatorOneArg[Event, Int]() {
    override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
      event.id = arg
      println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
    }
  }

  def main(args: Array[String]): Unit = {
    disruptor.start()
    (0 until 100).foreach { i =>
      disruptor.publishEvent(translator, i)
    }
    disruptor.shutdown()
  }
}

case class Event(var id: Int) {
  override def toString: String = s"event: ${id}"
}

object TestHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

object ThenHandler extends EventHandler[Event] {
  override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
    println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
  }
}

源碼閱讀

disrutpor 初始化

先看 Disruptor 構造方法

public Disruptor(final EventFactory<T> eventFactory, 
  final int ringBufferSize, 
  final ThreadFactory threadFactory, 
  final ProducerType producerType,
  final WaitStrategy waitStrategy) {
    this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), 
        new BasicExecutor(threadFactory));
}

在看 RingBuffer.create, 最終通過 fill 方法 將 eventFactory.newInstance() 作為默認值,塞到 ringBuffer 里面

public static <E> RingBuffer<E> create(ProducerType producerType, 
  EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
    switch (producerType) {
        case SINGLE:
            return createSingleProducer(factory, bufferSize, waitStrategy);
        case MULTI:
            return createMultiProducer(factory, bufferSize, waitStrategy);
        default:
            throw new IllegalStateException(producerType.toString());
    }
}

public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize, 
    WaitStrategy waitStrategy) {
    SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);

    return new RingBuffer<E>(factory, sequencer);
}

RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
    this.sequencer = sequencer;
    this.bufferSize = sequencer.getBufferSize();

    if (bufferSize < 1) {
        throw new IllegalArgumentException("bufferSize must not be less than 1");
    }
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    this.indexMask = bufferSize - 1;
    this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
    fill(eventFactory);
}

private void fill(EventFactory<E> eventFactory) {
    for (int i = 0; i < bufferSize; i++) {
        entries[BUFFER_PAD + i] = eventFactory.newInstance();
    }
}

消費事件消息

首先看 disruptor.start(): 消費事件消息入口

private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();

public RingBuffer<T> start() {
    checkOnlyStartedOnce();
    for (final ConsumerInfo consumerInfo : consumerRepository) {
        consumerInfo.start(executor);
    }

    return ringBuffer;
}

consumerRepository 類型由 disruptor.handleEventsWith(TestHandler) 初始化, 并構造事件消息處理鏈

public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
    return createEventProcessors(new Sequence[0], handlers);
}

EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
    checkNotStarted();

    final Sequence[] processorSequences = new Sequence[eventHandlers.length];
    final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);

    for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
        final EventHandler<? super T> eventHandler = eventHandlers[i];

        final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);

        if (exceptionHandler != null) {
            batchEventProcessor.setExceptionHandler(exceptionHandler);
        }

        consumerRepository.add(batchEventProcessor, eventHandler, barrier);
        processorSequences[i] = batchEventProcessor.getSequence();
    }

    updateGatingSequencesForNextInChain(barrierSequences, processorSequences);

    return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}

回頭看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任務時,都會 new thread **但是 consumerRepository 的數量是有限的,所以 new thread 也沒啥問題

public Disruptor(
        final EventFactory<T> eventFactory,
        final int ringBufferSize,
        final ThreadFactory threadFactory,
        final ProducerType producerType,
        final WaitStrategy waitStrategy) {
    this(
        RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
        new BasicExecutor(threadFactory));
}

private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
    this.ringBuffer = ringBuffer;
    this.executor = executor;
}

@Override
public void start(final java.util.concurrent.Executor executor){
    //EventProcessor extends Runnable
    //executor = BasicExecutor 
    executor.execute(eventprocessor);
}

public final class BatchEventProcessor<T> implements EventProcessor {
  @Override
  public void run() {
      if (running.compareAndSet(IDLE, RUNNING)) {
          sequenceBarrier.clearAlert();

          notifyStart();
          try {
              if (running.get() == RUNNING) {
                  processEvents();
              }
          } finally {
              notifyShutdown();
              running.set(IDLE);
          }
      } else {
          if (running.get() == RUNNING) {
              throw new IllegalStateException("Thread is already running");
          } else {
              earlyExit();
          }
      }
  }
}

private void processEvents() {
    T event = null;
    long nextSequence = sequence.get() + 1L;

    while (true) {
        try {
            final long availableSequence = sequenceBarrier.waitFor(nextSequence);
            if (batchStartAware != null) {
                batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
            }

            while (nextSequence <= availableSequence) {
                event = dataProvider.get(nextSequence);
                eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
                nextSequence++;
            }

            sequence.set(availableSequence);
        } catch (final TimeoutException e) {
            notifyTimeout(sequence.get());
        } catch (final AlertException ex) {
            if (running.get() != RUNNING) {
                break;
            }
        } catch (final Throwable ex) {
            exceptionHandler.handleEventException(ex, nextSequence, event);
            sequence.set(nextSequence);
            nextSequence++;
        }
    }
}

executor.execute 也就是 BasicExecutor.execute(eventHandler) 會異步的執行 eventHandler, 也就是調用 BatchEventProcessor.run 方法

問題來了,既然是異步執行,多個 eventHandler 是怎么按照順序去處理事件消息的?

我們看 processEvents 方法執行邏輯

  1. 先獲取 BatchEventProcessor.sequence 并 +1

  2. 通過 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 獲取到可用的 availableSequence

  3. 先看下 BlockingWaitStrategy.waitFor 的實現

     public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, 
        SequenceBarrier barrier)
        throws AlertException, InterruptedException {
        long availableSequence;
        if (cursorSequence.get() < sequence) {
            lock.lock();
            try {
                while (cursorSequence.get() < sequence) {
                    barrier.checkAlert();
                    processorNotifyCondition.await();
                }
            }
            finally {
                lock.unlock();
            }
        }
    
        while ((availableSequence = dependentSequence.get()) < sequence) {
            barrier.checkAlert();
        }
    
        return availableSequence;
    }


    如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 則batchEventProcessor掛起等待 否則 就用 dependentSequence 作為 availableSequence 返回 然后 batchEventProcessor 會將 availableSequence 索引之前的數據一次性處理完,并更新自身的 sequence 索引值

  4. dependentSequence 由 ProcessingSequenceBarrier 構造方法初始化

    final class ProcessingSequenceBarrier implements SequenceBarrier {
        private final WaitStrategy waitStrategy;
        private final Sequence dependentSequence;
        private volatile boolean alerted = false;
        private final Sequence cursorSequence;
        private final Sequencer sequencer;
    
        ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
            final Sequence cursorSequence, final Sequence[] dependentSequences) {
            this.sequencer = sequencer;
            this.waitStrategy = waitStrategy;
            this.cursorSequence = cursorSequence;
            if (0 == dependentSequences.length) {
                dependentSequence = cursorSequence;
            } else {
                dependentSequence = new FixedSequenceGroup(dependentSequences);
            }
        }
    }


    在 Disruptor.createEventProcessors 中的, 進行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 僅會被 Disruptor.handleEventsWithEventHandlerGroup.handleEventsWith

    public class Disruptor<T> {
        public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
            return createEventProcessors(new Sequence[0], handlers);
        }
    
        EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
            final EventHandler<? super T>[] eventHandlers) {
            checkNotStarted();
    
            final Sequence[] processorSequences = new Sequence[eventHandlers.length];
            final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
    
            for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
                final EventHandler<? super T> eventHandler = eventHandlers[i];
    
                final BatchEventProcessor<T> batchEventProcessor = 
                    new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
    
                if (exceptionHandler != null) {
                    batchEventProcessor.setExceptionHandler(exceptionHandler);
                }
    
                consumerRepository.add(batchEventProcessor, eventHandler, barrier);
                processorSequences[i] = batchEventProcessor.getSequence();
            }
    
            updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
    
            return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
        }
    }
    
    public class EventHandlerGroup<T> {
        private final Disruptor<T> disruptor;
        private final ConsumerRepository<T> consumerRepository;
        private final Sequence[] sequences;
    
        EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
            final Sequence[] sequences) {
            this.disruptor = disruptor;
            this.consumerRepository = consumerRepository;
            this.sequences = Arrays.copyOf(sequences, sequences.length);
        }
    
        public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
            return disruptor.createEventProcessors(sequences, handlers);
        }
    
        public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
            return handleEventsWith(handlers);
        }
    }


    EventHandlerGroup 會拷貝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通過 then 方法將 TestHandler 中的 sequence 傳遞給 ThenHandler 這樣 ThenHandler 就依賴了 TestHandler, ThenHandler 就會在 TestHandler 后執行

生產事件消息

接著看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放數據,

public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
    final long sequence = sequencer.next();
    translateAndPublish(translator, sequence, arg0);
}

private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
    try {
        translator.translateTo(get(sequence), sequence, arg0);
    } finally {
        sequencer.publish(sequence);
    }
}

public E get(long sequence) {
    return elementAt(sequence);
}

get(sequence) 根據 sequence [ringbuffer 索引] 獲取 ringbuffer 數組里的對象 translator 將其處理替換完后,ringbuffer 數組的的值將是新的值,publish 將會更新索引的標記位 waitStrategy.signalAllWhenBlocking() 會通知阻塞等待的消費者去繼續消費消息

protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
    cursor.set(sequence);
    waitStrategy.signalAllWhenBlocking();
}

總結

流程理清楚了,我們看看 知識點

  • ringbuffer

    • 內存使用率很高,不會造成內存碎片,幾乎沒有浪費。業務處理的同一時間,訪問的內存數據段集中。 可以更好的適應不同系統,取得較高的性能。內存的物理布局簡單單一,不太容易發生內存越界、懸空指針等 bug,出了問題也容易在內存級別分析調試。 做出來的系統容易保持健壯。

  • cpu cache

    • CPU 訪問內存時會等待,導致計算資源大量閑置,降低 CPU 整體吞吐量。 由于內存數據訪問的熱點集中性,在 CPU 和內存之間用較為快速而成本較高(相對于內存)的介質做一層緩存,就顯得性價比極高了

看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

琼海市| 南召县| 普格县| 泽普县| 台东市| 武城县| 墨玉县| 武汉市| 吉木萨尔县| 松阳县| 和政县| 齐齐哈尔市| 钟山县| 同心县| 湘西| 新干县| 乐平市| 金川县| 锡林郭勒盟| 自贡市| 紫阳县| 云南省| 邵东县| 巩义市| 富裕县| 浑源县| 固原市| 禹州市| 鄂伦春自治旗| 汪清县| 苗栗县| 乌兰县| 英山县| 景东| 临沧市| 濮阳县| 长子县| 澄迈县| 凤台县| 九寨沟县| 金昌市|