中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Hive UDAF開發詳解

發布時間:2020-06-27 04:58:27 來源:網絡 閱讀:1660 作者:choulanlan 欄目:大數據

說明

這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴格翻譯,因為翻譯的文章示例寫得比較通俗易懂,此外,我把自己對于Hive的UDAF理解穿插到文章里面。

udfa是Hive中用戶自定義的聚集函數,hive內置UDAF函數包括有sum()與count(),UDAF實現有簡單與通用兩種方式,簡單UDAF因為使用Java反射導致性能損失,而且有些特性不能使用,已經被棄用了;在這篇博文中我們將關注Hive中自定義聚類函數-GenericUDAF,UDAF開發主要涉及到以下兩個抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver??

  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

源碼鏈接

博文中的所有的代碼和數據可以在以下鏈接找到:hive examples

示例數據準備

首先先創建一張包含示例數據的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數據保存在people.txt文件中。

[plain]?view plain?copy

  1. ~$?cat?./people.txt??

  2. ??

  3. John?Smith??

  4. John?and?Ann?White??

  5. Ted?Green??

  6. Dorothy??

把該文件上載到hdfs目錄/user/matthew/people中:

[plain]?view plain?copy

  1. hadoop?fs?-mkdir?people??

  2. hadoop?fs?-put?./people.txt?people??

下面要創建hive外部表,在hive shell中執行


[sql]?view plain?copy

  1. CREATE?EXTERNAL?TABLE?people?(name?string)??

  2. ROW?FORMAT?DELIMITED?FIELDS???

  3. ????TERMINATED?BY?'\t'???

  4. ????ESCAPED?BY?''???

  5. ????LINES?TERMINATED?BY?'\n'??

  6. STORED?AS?TEXTFILE???

  7. LOCATION?'/user/matthew/people';??


相關抽象類介紹

創建一個GenericUDAF必須先了解以下兩個抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver???

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

為了更好理解上述抽象類的API,要記住hive只是mapreduce函數,只不過hive已經幫助我們寫好并隱藏mapreduce,向上提供簡潔的sql函數,所以我們要結合Mapper、Combiner與Reducer來幫助我們理解這個函數。要記住在Hadoop集群中有若干臺機器,在不同的機器上Mapper與Reducer任務獨立運行。

所以大體上來說,這個UDAF函數讀取數據(mapper),聚集一堆mapper輸出到部分聚集結果(combiner),并且最終創建一個最終的聚集結果(reducer)。因為我們跨域多個combiner進行聚集,所以我們需要保存部分聚集結果。

AbstractGenericUDAFResolver

Resolver很簡單,要覆蓋實現下面方法,該方法會根據sql傳人的參數數據格式指定調用哪個Evaluator進行處理。

[java]?view plain?copy

  1. <span?style="background-color:?rgb(255,?255,?255);"><span?style="font-size:14px;">public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)?throws?SemanticException;</span></span>??

GenericUDAFEvaluator

UDAF邏輯處理主要發生在Evaluator中,要實現該抽象類的幾個方法。

在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內部類Model。


ObjectInspector

作用主要是解耦數據使用與數據格式,使得數據流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有關于objectinspector的介紹。

Model

Model代表了UDAF在mapreduce的各個階段。

[java]?view plain?copy

  1. public?static?enum?Mode?{??

  2. ????/**?

  3. ?????*?PARTIAL1:?這個是mapreduce的map階段:從原始數據到部分數據聚合?

  4. ?????*?將會調用iterate()和terminatePartial()?

  5. ?????*/??

  6. ????PARTIAL1,??

  7. ????????/**?

  8. ?????*?PARTIAL2:?這個是mapreduce的map端的Combiner階段,負責在map端合并map的數據::從部分數據聚合到部分數據聚合:?

  9. ?????*?將會調用merge()?和?terminatePartial()??

  10. ?????*/??

  11. ????PARTIAL2,??

  12. ????????/**?

  13. ?????*?FINAL:?mapreduce的reduce階段:從部分數據的聚合到完全聚合??

  14. ?????*?將會調用merge()和terminate()?

  15. ?????*/??

  16. ????FINAL,??

  17. ????????/**?

  18. ?????*?COMPLETE:?如果出現了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結果了:從原始數據直接到完全聚合?

  19. ??????*?將會調用?iterate()和terminate()?

  20. ?????*/??

  21. ????COMPLETE??

  22. ??};??

一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經歷PARTIAL1(mapper),FINAL(reducer),如果還有combiner,那就會經歷PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數據,出結果。

GenericUDAFEvaluator的方法

[java]?view plain?copy

  1. //?確定各個階段輸入輸出參數的數據格式ObjectInspectors??

  2. public??ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)?throws?HiveException;??

  3. ??

  4. //?保存數據聚集結果的類??

  5. abstract?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException;??

  6. ??

  7. //?重置聚集結果??

  8. public?void?reset(AggregationBuffer?agg)?throws?HiveException;??

  9. ??

  10. //?map階段,迭代處理輸入sql傳過來的列數據??

  11. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)?throws?HiveException;??

  12. ??

  13. //?map與combiner結束返回結果,得到部分數據聚集結果??

  14. public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException;??

  15. ??

  16. //?combiner合并map返回的結果,還有reducer合并mapper或combiner返回的結果。??

  17. public?void?merge(AggregationBuffer?agg,?Object?partial)?throws?HiveException;??

  18. ??

  19. //?reducer階段,輸出最終結果??

  20. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException;??

圖解Model與Evaluator關系

Hive UDAF開發詳解

Model各階段對應Evaluator方法調用


Hive UDAF開發詳解


Evaluator各個階段下處理mapreduce流程

實例

下面將講述一個聚集函數UDAF的實例,我們將計算people這張表中的name列字母的個數。

下面的函數代碼是計算指定列中字符的總數(包括空格)

代碼

[java]?view plain?copy

  1. @Description(name?=?"letters",?value?=?"_FUNC_(expr)?-?返回該列中所有字符串的字符總數")??

  2. public?class?TotalNumOfLettersGenericUDAF?extends?AbstractGenericUDAFResolver?{??

  3. ??

  4. ????@Override??

  5. ????public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)??

  6. ????????????throws?SemanticException?{??

  7. ????????if?(parameters.length?!=?1)?{??

  8. ????????????throw?new?UDFArgumentTypeException(parameters.length?-?1,??

  9. ????????????????????"Exactly?one?argument?is?expected.");??

  10. ????????}??

  11. ??????????

  12. ????????ObjectInspector?oi?=?TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);??

  13. ??????????

  14. ????????if?(oi.getCategory()?!=?ObjectInspector.Category.PRIMITIVE){??

  15. ????????????throw?new?UDFArgumentTypeException(0,??

  16. ????????????????????????????"Argument?must?be?PRIMITIVE,?but?"??

  17. ????????????????????????????+?oi.getCategory().name()??

  18. ????????????????????????????+?"?was?passed.");??

  19. ????????}??

  20. ??????????

  21. ????????PrimitiveObjectInspector?inputOI?=?(PrimitiveObjectInspector)?oi;??

  22. ??????????

  23. ????????if?(inputOI.getPrimitiveCategory()?!=?PrimitiveObjectInspector.PrimitiveCategory.STRING){??

  24. ????????????throw?new?UDFArgumentTypeException(0,??

  25. ????????????????????????????"Argument?must?be?String,?but?"??

  26. ????????????????????????????+?inputOI.getPrimitiveCategory().name()??

  27. ????????????????????????????+?"?was?passed.");??

  28. ????????}??

  29. ??????????

  30. ????????return?new?TotalNumOfLettersEvaluator();??

  31. ????}??

  32. ??

  33. ????public?static?class?TotalNumOfLettersEvaluator?extends?GenericUDAFEvaluator?{??

  34. ??

  35. ????????PrimitiveObjectInspector?inputOI;??

  36. ????????ObjectInspector?outputOI;??

  37. ????????PrimitiveObjectInspector?integerOI;??

  38. ??????????

  39. ????????int?total?=?0;??

  40. ??

  41. ????????@Override??

  42. ????????public?ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)??

  43. ????????????????throws?HiveException?{??

  44. ??????????????

  45. ????????????assert?(parameters.length?==?1);??

  46. ????????????super.init(m,?parameters);??

  47. ?????????????

  48. ?????????????//map階段讀取sql列,輸入為String基礎數據格式??

  49. ????????????if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  50. ????????????????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  51. ????????????}?else?{??

  52. ????????????//其余階段,輸入為Integer基礎數據格式??

  53. ????????????????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  54. ????????????}??

  55. ??

  56. ?????????????//?指定各個階段輸出數據格式都為Integer類型??

  57. ????????????outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  58. ????????????????????ObjectInspectorOptions.JAVA);??

  59. ????????????return?outputOI;??

  60. ??

  61. ????????}??

  62. ??

  63. ????????/**?

  64. ?????????*?存儲當前字符總數的類?

  65. ?????????*/??

  66. ????????static?class?LetterSumAgg?implements?AggregationBuffer?{??

  67. ????????????int?sum?=?0;??

  68. ????????????void?add(int?num){??

  69. ????????????????sum?+=?num;??

  70. ????????????}??

  71. ????????}??

  72. ??

  73. ????????@Override??

  74. ????????public?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException?{??

  75. ????????????LetterSumAgg?result?=?new?LetterSumAgg();??

  76. ????????????return?result;??

  77. ????????}??

  78. ??

  79. ????????@Override??

  80. ????????public?void?reset(AggregationBuffer?agg)?throws?HiveException?{??

  81. ????????????LetterSumAgg?myagg?=?new?LetterSumAgg();??

  82. ????????}??

  83. ??????????

  84. ????????private?boolean?warned?=?false;??

  85. ??

  86. ????????@Override??

  87. ????????public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  88. ????????????????throws?HiveException?{??

  89. ????????????assert?(parameters.length?==?1);??

  90. ????????????if?(parameters[0]?!=?null)?{??

  91. ????????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  92. ????????????????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  93. ????????????????myagg.add(String.valueOf(p1).length());??

  94. ????????????}??

  95. ????????}??

  96. ??

  97. ????????@Override??

  98. ????????public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException?{??

  99. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  100. ????????????total?+=?myagg.sum;??

  101. ????????????return?total;??

  102. ????????}??

  103. ??

  104. ????????@Override??

  105. ????????public?void?merge(AggregationBuffer?agg,?Object?partial)??

  106. ????????????????throws?HiveException?{??

  107. ????????????if?(partial?!=?null)?{??

  108. ??????????????????

  109. ????????????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  110. ??????????????????

  111. ????????????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  112. ??????????????????

  113. ????????????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  114. ??????????????????

  115. ????????????????myagg2.add(partialSum);??

  116. ????????????????myagg1.add(myagg2.sum);??

  117. ????????????}??

  118. ????????}??

  119. ??

  120. ????????@Override??

  121. ????????public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  122. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  123. ????????????total?=?myagg.sum;??

  124. ????????????return?myagg.sum;??

  125. ????????}??

  126. ??

  127. ????}??

  128. }??


代碼說明

這里有一些關于combiner的資源,Philippe Adjiman?講得不錯。


AggregationBuffer?允許我們保存中間結果,通過定義我們的buffer,我們可以處理任何格式的數據,在代碼例子中字符總數保存在AggregationBuffer?。


[java]?view plain?copy

  1. /**?

  2. *?保存當前字符總數的類?

  3. */??

  4. static?class?LetterSumAgg?implements?AggregationBuffer?{??

  5. ????int?sum?=?0;??

  6. ????void?add(int?num){??

  7. ????????sum?+=?num;??

  8. ????}??

  9. }??


這意味著UDAF在不同的mapreduce階段會接收到不同的輸入。Iterate讀取我們表中的一行(或者準確來說是表),然后輸出其他數據格式的聚集結果。

artialAggregation合并這些聚集結果到另外相同格式的新的聚集結果,然后最終的reducer取得這些聚集結果然后輸出最終結果(該結果或許與接收數據的格式不一致)。

在init()方法中我們指定輸入為string,結果輸出格式為integer,還有,部分聚集結果輸出格式為integer(保存在aggregation buffer中);terminate()terminatePartial()兩者輸出一個integer


[java]?view plain?copy

  1. //?init方法中根據不同的mode指定輸出數據的格式objectinspector??

  2. if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  3. ????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  4. }?else?{??

  5. ????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  6. }??

  7. ??

  8. //?不同model階段的輸出數據格式??

  9. outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  10. ????????????????????ObjectInspectorOptions.JAVA);??


iterate()函數讀取到每行中列的字符串,計算與保存該字符串的長度

[java]?view plain?copy

  1. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  2. ????throws?HiveException?{??

  3. ????...??

  4. ????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  5. ????myagg.add(String.valueOf(p1).length());??

  6. ????}??

  7. }??


Merge函數增加部分聚集總數到AggregationBuffer

[java]?view plain?copy

  1. public?void?merge(AggregationBuffer?agg,?Object?partial)??

  2. ????????throws?HiveException?{??

  3. ????if?(partial?!=?null)?{??

  4. ??????????????????

  5. ????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  6. ??????????????????

  7. ????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  8. ??????????????????

  9. ????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  10. ??????????????????

  11. ????????myagg2.add(partialSum);??

  12. ????????myagg1.add(myagg2.sum);??

  13. ????}??

  14. }??


Terminate()函數返回AggregationBuffer中的內容,這里產生了最終結果。

[java]?view plain?copy

  1. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  2. ????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  3. ????total?=?myagg.sum;??

  4. ????return?myagg.sum;??

  5. }??

使用自定義函數

[plain]?view plain?copy

  1. ADD?JAR?./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;??

  2. CREATE?TEMPORARY?FUNCTION?letters?as?'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';??

  3. ??

  4. SELECT?letters(name)?FROM?people;??

  5. OK??

  6. 44??

  7. Time?taken:?20.688?seconds ?


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

南陵县| 阳谷县| 聂拉木县| 丽水市| 称多县| 清流县| 承德县| 新邵县| 南宫市| 龙泉市| 麻栗坡县| 盐城市| 渭南市| 吉林市| 邵东县| 巫溪县| 宜阳县| 南雄市| 阳高县| 长兴县| 灌云县| 库伦旗| 昌平区| 刚察县| 隆德县| 黎平县| 恩施市| 中江县| 泰来县| 千阳县| 普兰店市| 宜兰市| 汝城县| 吴忠市| 平顶山市| 宁蒗| 都昌县| 竹溪县| 中山市| 淳化县| 东宁县|