中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Flink的SideOutputSplit分流怎么實現

發布時間:2021-12-31 15:29:28 來源:億速云 閱讀:151 作者:iii 欄目:大數據

這篇文章主要講解了“Flink的SideOutputSplit分流怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink的SideOutputSplit分流怎么實現”吧!

版本說明:

環境: Windiws

Scala: 2.11.8

Flink :1.10.1

大部分的DataStream API的算子的輸出是單一輸出,也就是某種數據類型的流。

除了split算子,可以將一條流分成多條流,這些流的數據類型也都相同。

process function的side outputs功能可以產生多條流(Flink 1.9版本之后推薦此種方案),并且這些流的數據類型可以不一樣。一個side output可以定義為OutputTag[X]對象,X是輸出流的數據類型。process function可以通過Context對象發射一個事件到一個或者多個side outputs。

import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.util.Collector

/**
  *
  * @param deviceNo    設備號
  * @param timestamp   時間戳
  * @param temperature 溫度
  */
case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double)

object SensorReadingSplitStreaming {
    def main(args: Array[String]): Unit = {
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        //設置時間語義  時間發生時間
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)


        val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt")

        val mapStream: DataStream[SensorReading] = socketSource
            .map(data => {
                val split: Array[String] = data.split(",")
                SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble)
            })

        //對數據流進行分流處理
        val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess())

        tmpStageStream.print("main");
        val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp"))
        val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp"))
        lowStream.print("low")
        highStream.print("high")
        env.execute()
    }

}


class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] {
    // 定義側輸出流
    lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp");
    lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp");

    //處理數據
    override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = {
        if (value.temperature < 10) {
            context.output(lowTmp, (value.deviceNo, value.temperature))
        } else if (value.temperature > 70) {
            context.output(HighTmp, (value.deviceNo, value.temperature))
        } else {
            collector.collect(value)
        }
    }
}  //測試文件內容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

/*
設備8,1610035289736,84.3
設備5,1610035371758,38.8
設備5,1610035458637,60.2
設備1,1610035543127,10.2
設備7,1610035623427,51.6
設備5,1610035705302,20.1
設備5,1610035787387,12.9
設備7,1610035877019,88.2
設備6,1610035960537,33.5
設備7,1610036043040,63.0
設備5,1610036125179,64.5
設備6,1610036214972,30.2
設備5,1610036296542,56.5
設備7,1610036377999,29.7
設備6,1610036467523,59.4
設備4,1610036557446,71.1
設備5,1610036641100,28.2
設備2,1610036725803,88.8
設備8,1610036808041,73.5
設備1,1610036897060,18.0
設備7,1610036980127,14.9
設備2,1610037069523,47.4
設備4,1610037154507,59.5
設備5,1610037235099,35.0
設備6,1610037317868,76.4
設備2,1610037403367,10.0
設備2,1610037484177,18.5
設備4,1610037571384,98.7
設備5,1610037653666,95.6
設備6,1610037735520,32.6
設備6,1610037823906,83.3
設備3,1610037913756,29.1
設備7,1610037994980,74.6
設備6,1610038081606,22.2
設備3,1610038163043,10.4
設備5,1610038244717,56.9
設備3,1610038326227,64.8
設備4,1610038411053,65.0
設備8,1610038500538,93.2
設備8,1610038583924,76.2
設備1,1610038670150,42.1
設備5,1610038756839,35.1
設備3,1610038840180,75.9
設備3,1610038929751,83.4
設備7,1610039019422,24.1
設備3,1610039101778,85.0
設備8,1610039183077,45.6
設備3,1610039264498,79.5
設備1,1610039351600,44.4
設備8,1610039434187,73.3
設備3,1610039518048,77.9
設備7,1610039598556,9.79
設備4,1610039679144,19.0
設備2,1610039761967,56.1
設備3,1610039847823,88.2
設備6,1610039933024,77.4
設備7,1610040014212,14.4
設備4,1610040101627,98.2
設備8,1610040182379,85.0
設備6,1610040265210,61.8
設備2,1610040345769,48.0
設備3,1610040432855,19.9
設備4,1610040515943,30.9
設備4,1610040601373,51.7
設備1,1610040681803,29.7
設備8,1610040770779,31.6
設備3,1610040851986,67.1
設備4,1610040941421,93.2
設備7,1610041022836,37.2
設備8,1610041105401,84.6
設備6,1610041189301,19.2
設備4,1610041270735,99.0
設備4,1610041354109,77.0
設備5,1610041435113,49.7
設備1,1610041521773,74.2
設備8,1610041603035,42.2
設備3,1610041687230,87.1
設備1,1610041767985,82.7
設備3,1610041848130,0.59
設備4,1610041933021,7.38
設備2,1610042016080,28.9
設備2,1610042103229,99.2
設備2,1610042190222,42.2
設備3,1610042277841,12.0
設備7,1610042364076,93.5
設備7,1610042444652,10.5
設備4,1610042530461,68.5
設備1,1610042615421,78.2
設備3,1610042702219,18.5
設備6,1610042787478,64.8
設備5,1610042874301,6.34
設備2,1610042956073,65.6
設備8,1610043038793,10.6
設備8,1610043122971,30.3
設備7,1610043203810,17.5
設備8,1610043291566,83.8
設備5,1610043373188,30.5
設備2,1610043456107,84.7
設備1,1610043545998,53.4
設備3,1610043627174,97.4

 */

輸出結果:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
17:19:42,659 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:42,725 WARN  org.apache.flink.runtime.taskmanager.TaskManagerLocation      - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted.
17:19:43,088 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Log file environment variable 'log.file' is not set.
17:19:43,089 WARN  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'.
high> (設備8,84.3)
main> SensorReading(設備5,1610035371758,38.8)
main> SensorReading(設備5,1610035458637,60.2)
main> SensorReading(設備1,1610035543127,10.2)
main> SensorReading(設備7,1610035623427,51.6)
main> SensorReading(設備5,1610035705302,20.1)
main> SensorReading(設備5,1610035787387,12.9)
high> (設備7,88.2)
main> SensorReading(設備6,1610035960537,33.5)
main> SensorReading(設備7,1610036043040,63.0)
main> SensorReading(設備5,1610036125179,64.5)
main> SensorReading(設備6,1610036214972,30.2)
main> SensorReading(設備5,1610036296542,56.5)
main> SensorReading(設備7,1610036377999,29.7)
main> SensorReading(設備6,1610036467523,59.4)
high> (設備4,71.1)
main> SensorReading(設備5,1610036641100,28.2)
high> (設備2,88.8)
high> (設備8,73.5)
main> SensorReading(設備1,1610036897060,18.0)
main> SensorReading(設備7,1610036980127,14.9)
main> SensorReading(設備2,1610037069523,47.4)
main> SensorReading(設備4,1610037154507,59.5)
main> SensorReading(設備5,1610037235099,35.0)
high> (設備6,76.4)
main> SensorReading(設備2,1610037403367,10.0)
main> SensorReading(設備2,1610037484177,18.5)
high> (設備4,98.7)
high> (設備5,95.6)
main> SensorReading(設備6,1610037735520,32.6)
high> (設備6,83.3)
main> SensorReading(設備3,1610037913756,29.1)
high> (設備7,74.6)
main> SensorReading(設備6,1610038081606,22.2)
main> SensorReading(設備3,1610038163043,10.4)
main> SensorReading(設備5,1610038244717,56.9)
main> SensorReading(設備3,1610038326227,64.8)
main> SensorReading(設備4,1610038411053,65.0)
high> (設備8,93.2)
high> (設備8,76.2)
main> SensorReading(設備1,1610038670150,42.1)
main> SensorReading(設備5,1610038756839,35.1)
high> (設備3,75.9)
high> (設備3,83.4)
main> SensorReading(設備7,1610039019422,24.1)
high> (設備3,85.0)
main> SensorReading(設備8,1610039183077,45.6)
high> (設備3,79.5)
main> SensorReading(設備1,1610039351600,44.4)
high> (設備8,73.3)
high> (設備3,77.9)
low> (設備7,9.79)
main> SensorReading(設備4,1610039679144,19.0)
main> SensorReading(設備2,1610039761967,56.1)
high> (設備3,88.2)
high> (設備6,77.4)
main> SensorReading(設備7,1610040014212,14.4)
high> (設備4,98.2)
high> (設備8,85.0)
main> SensorReading(設備6,1610040265210,61.8)
main> SensorReading(設備2,1610040345769,48.0)
main> SensorReading(設備3,1610040432855,19.9)
main> SensorReading(設備4,1610040515943,30.9)
main> SensorReading(設備4,1610040601373,51.7)
main> SensorReading(設備1,1610040681803,29.7)
main> SensorReading(設備8,1610040770779,31.6)
main> SensorReading(設備3,1610040851986,67.1)
high> (設備4,93.2)
main> SensorReading(設備7,1610041022836,37.2)
high> (設備8,84.6)
main> SensorReading(設備6,1610041189301,19.2)
high> (設備4,99.0)
high> (設備4,77.0)
main> SensorReading(設備5,1610041435113,49.7)
high> (設備1,74.2)
main> SensorReading(設備8,1610041603035,42.2)
high> (設備3,87.1)
high> (設備1,82.7)
low> (設備3,0.59)
low> (設備4,7.38)
main> SensorReading(設備2,1610042016080,28.9)
high> (設備2,99.2)
main> SensorReading(設備2,1610042190222,42.2)
main> SensorReading(設備3,1610042277841,12.0)
high> (設備7,93.5)
main> SensorReading(設備7,1610042444652,10.5)
main> SensorReading(設備4,1610042530461,68.5)
high> (設備1,78.2)
main> SensorReading(設備3,1610042702219,18.5)
main> SensorReading(設備6,1610042787478,64.8)
low> (設備5,6.34)
main> SensorReading(設備2,1610042956073,65.6)
main> SensorReading(設備8,1610043038793,10.6)
main> SensorReading(設備8,1610043122971,30.3)
main> SensorReading(設備7,1610043203810,17.5)
high> (設備8,83.8)
main> SensorReading(設備5,1610043373188,30.5)
high> (設備2,84.7)
main> SensorReading(設備1,1610043545998,53.4)
high> (設備3,97.4)

Process finished with exit code 0

感謝各位的閱讀,以上就是“Flink的SideOutputSplit分流怎么實現”的內容了,經過本文的學習后,相信大家對Flink的SideOutputSplit分流怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

咸阳市| 江北区| 盐亭县| 三亚市| 广饶县| 泸州市| 高安市| 通许县| 益阳市| 惠来县| 呼伦贝尔市| 钟山县| 平武县| 微山县| 天祝| 崇阳县| 阜城县| 砚山县| 会同县| 龙游县| 若尔盖县| 延边| 曲靖市| 昌宁县| 兴宁市| 利津县| 凤阳县| 屯门区| 晋宁县| 常州市| 徐水县| 濮阳市| 通州市| 东丽区| 宜兰市| 岳阳县| 西盟| 平阴县| 南木林县| 舞钢市| 咸宁市|