中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

ES學習筆記之-AvgAggregation的實現過程分析

發布時間:2020-07-01 01:02:03 來源:網絡 閱讀:2122 作者:sbp810050504 欄目:大數據

我們需要查看數據的統計量時,均值是最重要的特征之一。

對于海量數據,這類簡單的聚合ES可以做到秒級別返回。聚合是ES的特色功能。

那么ES是如何實現這一功能的呢?

我們知道,ES的數據存儲在各個節點中, 所以ES的實現AvgAggregation時基本思路就是先統計各個節點,然后匯總。

先了解ES是如何統計單個節點: 參考AvgAggregator

    @Override
    public LeafBucketCollector getLeafCollector(LeafReaderContext ctx,
            final LeafBucketCollector sub) throws IOException {
        if (valuesSource == null) {
            return LeafBucketCollector.NO_OP_COLLECTOR;
        }
        final BigArrays bigArrays = context.bigArrays();
        final SortedNumericDoubleValues values = valuesSource.doubleValues(ctx);
        return new LeafBucketCollectorBase(sub, values) {
            @Override
            public void collect(int doc, long bucket) throws IOException {
                counts = bigArrays.grow(counts, bucket + 1);
                sums = bigArrays.grow(sums, bucket + 1);

                values.setDocument(doc);
                final int valueCount = values.count();
                counts.increment(bucket, valueCount);
                double sum = 0;
                for (int i = 0; i < valueCount; i++) {
                    sum += values.valueAt(i);
                }
                sums.increment(bucket, sum);
            }
        };
    }

即實現Collector類的collect()方法。然后通過doc_values機制獲取文檔相關字段的值,分別匯入counts和sums兩個變量中。

收集完成counts和sums過后,就需要匯總各個節點的值, 這在搜索的第二階段。

從第一階段到第二階段,整個鏈路如下:
s1: 前端請求發送到集群某一節點的TransportSearchAction.doExecute()方法中。

     switch(searchRequest.searchType()) {
               .....
           case QUERY_THEN_FETCH:
                searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchService, clusterService,
                        indexNameExpressionResolver, searchPhaseController, threadPool, searchRequest, listener);
                break;
              ......   
     }
        searchAsyncAction.start();

見到start()方法,我以為這個是另啟一個線程,后面發現原來不是的。 這個start()方法把整個查詢過程分為兩個階段:

階段一:
performFirstPhase(), 即把請求分發到各個節點,然后記錄節點處理的結果。如果返回的分片是最后一個分片,則轉入階段二。

階段二:
performFirstPhase() -> onFirstPhaseResult() -> innerMoveToSecondPhase() -> moveToSecondPhase() 。這里利用了模板設計模式。在階段二中,會再次向各個節點發起請求,通過docId獲取文檔內容。

s2: 對于聚合而言, 階段二最重要的鏈路是moveToSecondPhase() -> executeFetch() -> finishHim() -> searchPhaseController.merge() , merge()中包含了如下的業務邏輯: 合并hits, 合并suggest, 合并addAggregation 等。 這里我們關注聚合。

聚合的入口方法是InternalAggregations.reduce(), 如果熟悉hadoop, reduce方法的執行邏輯看這個名字也能理解一部分。reduce的中文翻譯“歸納”,挺生動形象的。整個鏈路的入口為InternalAvg.doReduce()

    @Override
    public InternalAvg doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
        long count = 0;
        double sum = 0;
        for (InternalAggregation aggregation : aggregations) {
            count += ((InternalAvg) aggregation).count;
            sum += ((InternalAvg) aggregation).sum;
        }
        return new InternalAvg(getName(), sum, count, valueFormatter, pipelineAggregators(), getMetaData());
    }

其邏輯相當簡單,count相加, sum相加。獲取最終的結果就是

    public double getValue() {
        return sum / count;
    }

上面講述了ES分發會匯總的關鍵節點,那么分發到各個節點的業務邏輯是怎樣的呢?

首先定位入口:

    class SearchQueryTransportHandler extends TransportRequestHandler<ShardSearchTransportRequest> {
        @Override
        public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel) throws Exception {
            QuerySearchResultProvider result = searchService.executeQueryPhase(request);
            channel.sendResponse(result);
        }
    }

然后定位到QueryPhrase.execute(), 在QueryPhrase這個階段,主要做的事情如下:

aggregationPhase.preProcess(searchContext): 解析ES的語法,生成Collector.
execute: 在調用Lucene的接口查詢數據前,組合各個Collecotr, collector = MultiCollector.wrap(subCollectors); 然后查詢Lucene索引。對于AvgAggregator, 其關鍵邏輯是:

            @Override
            public void collect(int doc, long bucket) throws IOException {
                counts = bigArrays.grow(counts, bucket + 1);
                sums = bigArrays.grow(sums, bucket + 1);

                values.setDocument(doc);
                final int valueCount = values.count();
                counts.increment(bucket, valueCount);
                double sum = 0;
                for (int i = 0; i < valueCount; i++) {
                    sum += values.valueAt(i);
                }
                sums.increment(bucket, sum);
            }

這個已經是第二次出現了, 它的功能就是收集每個命中查詢的doc相關信息。 這里獲取每個docId對應的value,是基于doc_value的正向索引。

以上就是整個Avg Aggregation的實現流程。 通過源碼,可以確認, AvgAggregation是精確可信的。 還有幾個聚合函數,其思路跟AvgAggregation是一致的,就不細說了,他們分別是: Max, Min, Sum, ValueCount, Stats 。。。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

岐山县| 武汉市| 榆树市| 泸溪县| 通山县| 四子王旗| 德昌县| 新乡市| 高雄市| 威远县| 视频| 安龙县| 南涧| 土默特右旗| 廉江市| 子洲县| 清苑县| 博客| 芮城县| 鄂尔多斯市| 北宁市| 闸北区| 登封市| 建德市| 保定市| 拜城县| 中山市| 岳西县| 教育| 资源县| 抚州市| 阿拉善左旗| 新平| 广元市| 蛟河市| 成都市| 洪洞县| 沁阳市| 蒙自县| 丰台区| 大同县|