中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

spark怎么編寫udaf函數求中位數

發布時間:2021-09-04 15:40:01 來源:億速云 閱讀:134 作者:chen 欄目:云計算

本篇內容主要講解“spark怎么編寫udaf函數求中位數”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“spark怎么編寫udaf函數求中位數”吧!

package com.frank.sparktest.java;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.expressions.MutableAggregationBuffer;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class MedianUdaf extends UserDefinedAggregateFunction {

    private StructType inputSchema;
    private StructType bufferSchema;

    public MedianUdaf(){
        List<StructField> inputFields = new ArrayList<>();
        inputFields.add(DataTypes.createStructField("nums",DataTypes.IntegerType,true));
        inputSchema=DataTypes.createStructType(inputFields);
        List<StructField> bufferFields = new ArrayList<>();
        bufferFields.add(DataTypes.createStructField("datas",DataTypes.StringType,true));
        bufferSchema=DataTypes.createStructType(bufferFields);
    }

    @Override
    public StructType inputSchema() {
        return inputSchema;
    }

    @Override
    public StructType bufferSchema() {
        return bufferSchema;
    }

    @Override
    public DataType dataType() {
        return DataTypes.DoubleType;
    }

    @Override
    public boolean deterministic() {
        return true;
    }

    @Override
    public void initialize(MutableAggregationBuffer buffer) {
        buffer.update(0,0);
        buffer.update(1,0);
    }

    @Override
    public void update(MutableAggregationBuffer buffer, Row input) {
        if (!input.isNullAt(0)){
            buffer.update(0,buffer.getString(0)+","+input.getInt(0));
        }
    }

    @Override
    public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
        buffer1.update(0,buffer1.getString(0)+","+buffer2.getInt(0));
    }

    @Override
    public Object evaluate(Row buffer) {
        List<Integer> list = new ArrayList<Integer>();
        List<String> stringList = Arrays.asList(buffer.getString(0).split(","));
        for (String s : stringList){
            list.add(Integer.valueOf(s));
        }
        Collections.sort(list);
        int size = list.size();
        int num=0;
        if(size % 2 == 1) {
            num = list.get((size / 2)+1);
        }
        if(size %2  == 0) {
            num = (list.get(size / 2)+list.get((size / 2)+1))/2;
        }
        return num;
    }

}

上面是代碼段,可以直接拿來使用

下面是測試程序

package com.frank.sparktest.java;

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;

import java.io.IOException;
import java.util.stream.IntStream;

public class DemoUDAF {

    public static void main(String[] args) throws IOException {
        SQLContext sqlContext = SparkSession.builder().master("local").getOrCreate().sqlContext();
        sqlContext.udf().register("generate", (Integer start, Integer end)-> IntStream.range(start, end+1).boxed().toArray(), DataTypes.createArrayType(DataTypes.IntegerType));
        sqlContext.udf().register("media",new MedianUdaf());
        sqlContext.sql("select generate(1,10)").show();
    }
}

到此,相信大家對“spark怎么編寫udaf函數求中位數”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

延川县| 响水县| 黑河市| 崇文区| 孝昌县| 龙口市| 镇远县| 荆门市| 郓城县| 三都| 芦溪县| 临泽县| 利津县| 达孜县| 浮山县| 梨树县| 尤溪县| 廉江市| 云安县| 绿春县| 凯里市| 色达县| 浠水县| 兰考县| 吉林市| 迭部县| 台山市| 溧水县| 浑源县| 隆子县| 遂溪县| 乌拉特前旗| 海林市| 满城县| 天峨县| 锡林郭勒盟| 土默特右旗| 策勒县| 天门市| 古浪县| 沁阳市|