您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink的SideOutputSplit分流怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink的SideOutputSplit分流怎么實現”吧!
環境: Windiws
Scala: 2.11.8
Flink :1.10.1
大部分的DataStream API的算子的輸出是單一輸出,也就是某種數據類型的流。
除了split算子,可以將一條流分成多條流,這些流的數據類型也都相同。
process function的side outputs功能可以產生多條流(Flink 1.9版本之后推薦此種方案),并且這些流的數據類型可以不一樣。一個side output可以定義為OutputTag[X]對象,X是輸出流的數據類型。process function可以通過Context對象發射一個事件到一個或者多個side outputs。
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.util.Collector /** * * @param deviceNo 設備號 * @param timestamp 時間戳 * @param temperature 溫度 */ case class SensorReading(deviceNo: String, timestamp: Long, temperature: Double) object SensorReadingSplitStreaming { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //設置時間語義 時間發生時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val socketSource: DataStream[String] = env.readTextFile("D:\\tmp\\file1.txt") val mapStream: DataStream[SensorReading] = socketSource .map(data => { val split: Array[String] = data.split(",") SensorReading(split(0).trim, split(1).trim.toLong, split(2).trim.toDouble) }) //對數據流進行分流處理 val tmpStageStream: DataStream[SensorReading] = mapStream.process(new TempStageProcess()) tmpStageStream.print("main"); val lowStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("low-tmp")) val highStream: DataStream[(String, Double)] = tmpStageStream.getSideOutput(new OutputTag[(String, Double)]("high-tmp")) lowStream.print("low") highStream.print("high") env.execute() } } class TempStageProcess() extends ProcessFunction[SensorReading, SensorReading] { // 定義側輸出流 lazy val lowTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("low-tmp"); lazy val HighTmp: OutputTag[(String, Double)] = new OutputTag[(String, Double)]("high-tmp"); //處理數據 override def processElement(value: SensorReading, context: ProcessFunction[SensorReading, SensorReading]#Context, collector: Collector[SensorReading]): Unit = { if (value.temperature < 10) { context.output(lowTmp, (value.deviceNo, value.temperature)) } else if (value.temperature > 70) { context.output(HighTmp, (value.deviceNo, value.temperature)) } else { collector.collect(value) } } } //測試文件內容如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ /* 設備8,1610035289736,84.3 設備5,1610035371758,38.8 設備5,1610035458637,60.2 設備1,1610035543127,10.2 設備7,1610035623427,51.6 設備5,1610035705302,20.1 設備5,1610035787387,12.9 設備7,1610035877019,88.2 設備6,1610035960537,33.5 設備7,1610036043040,63.0 設備5,1610036125179,64.5 設備6,1610036214972,30.2 設備5,1610036296542,56.5 設備7,1610036377999,29.7 設備6,1610036467523,59.4 設備4,1610036557446,71.1 設備5,1610036641100,28.2 設備2,1610036725803,88.8 設備8,1610036808041,73.5 設備1,1610036897060,18.0 設備7,1610036980127,14.9 設備2,1610037069523,47.4 設備4,1610037154507,59.5 設備5,1610037235099,35.0 設備6,1610037317868,76.4 設備2,1610037403367,10.0 設備2,1610037484177,18.5 設備4,1610037571384,98.7 設備5,1610037653666,95.6 設備6,1610037735520,32.6 設備6,1610037823906,83.3 設備3,1610037913756,29.1 設備7,1610037994980,74.6 設備6,1610038081606,22.2 設備3,1610038163043,10.4 設備5,1610038244717,56.9 設備3,1610038326227,64.8 設備4,1610038411053,65.0 設備8,1610038500538,93.2 設備8,1610038583924,76.2 設備1,1610038670150,42.1 設備5,1610038756839,35.1 設備3,1610038840180,75.9 設備3,1610038929751,83.4 設備7,1610039019422,24.1 設備3,1610039101778,85.0 設備8,1610039183077,45.6 設備3,1610039264498,79.5 設備1,1610039351600,44.4 設備8,1610039434187,73.3 設備3,1610039518048,77.9 設備7,1610039598556,9.79 設備4,1610039679144,19.0 設備2,1610039761967,56.1 設備3,1610039847823,88.2 設備6,1610039933024,77.4 設備7,1610040014212,14.4 設備4,1610040101627,98.2 設備8,1610040182379,85.0 設備6,1610040265210,61.8 設備2,1610040345769,48.0 設備3,1610040432855,19.9 設備4,1610040515943,30.9 設備4,1610040601373,51.7 設備1,1610040681803,29.7 設備8,1610040770779,31.6 設備3,1610040851986,67.1 設備4,1610040941421,93.2 設備7,1610041022836,37.2 設備8,1610041105401,84.6 設備6,1610041189301,19.2 設備4,1610041270735,99.0 設備4,1610041354109,77.0 設備5,1610041435113,49.7 設備1,1610041521773,74.2 設備8,1610041603035,42.2 設備3,1610041687230,87.1 設備1,1610041767985,82.7 設備3,1610041848130,0.59 設備4,1610041933021,7.38 設備2,1610042016080,28.9 設備2,1610042103229,99.2 設備2,1610042190222,42.2 設備3,1610042277841,12.0 設備7,1610042364076,93.5 設備7,1610042444652,10.5 設備4,1610042530461,68.5 設備1,1610042615421,78.2 設備3,1610042702219,18.5 設備6,1610042787478,64.8 設備5,1610042874301,6.34 設備2,1610042956073,65.6 設備8,1610043038793,10.6 設備8,1610043122971,30.3 設備7,1610043203810,17.5 設備8,1610043291566,83.8 設備5,1610043373188,30.5 設備2,1610043456107,84.7 設備1,1610043545998,53.4 設備3,1610043627174,97.4 */
輸出結果:
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/D:/.m2/repository/org/slf4j/slf4j-log4j12/1.7.10/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/D:/.m2/repository/ch/qos/logback/logback-classic/1.2.0/logback-classic-1.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17:19:42,659 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. 17:19:42,725 WARN org.apache.flink.runtime.taskmanager.TaskManagerLocation - No hostname could be resolved for the IP address 127.0.0.1, using IP address as host name. Local input split assignment (such as for HDFS files) may be impacted. 17:19:43,088 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - Log file environment variable 'log.file' is not set. 17:19:43,089 WARN org.apache.flink.runtime.webmonitor.WebMonitorUtils - JobManager log files are unavailable in the web dashboard. Log file location not found in environment variable 'log.file' or configuration key 'Key: 'web.log.path' , default: null (fallback keys: [{key=jobmanager.web.log.path, isDeprecated=true}])'. high> (設備8,84.3) main> SensorReading(設備5,1610035371758,38.8) main> SensorReading(設備5,1610035458637,60.2) main> SensorReading(設備1,1610035543127,10.2) main> SensorReading(設備7,1610035623427,51.6) main> SensorReading(設備5,1610035705302,20.1) main> SensorReading(設備5,1610035787387,12.9) high> (設備7,88.2) main> SensorReading(設備6,1610035960537,33.5) main> SensorReading(設備7,1610036043040,63.0) main> SensorReading(設備5,1610036125179,64.5) main> SensorReading(設備6,1610036214972,30.2) main> SensorReading(設備5,1610036296542,56.5) main> SensorReading(設備7,1610036377999,29.7) main> SensorReading(設備6,1610036467523,59.4) high> (設備4,71.1) main> SensorReading(設備5,1610036641100,28.2) high> (設備2,88.8) high> (設備8,73.5) main> SensorReading(設備1,1610036897060,18.0) main> SensorReading(設備7,1610036980127,14.9) main> SensorReading(設備2,1610037069523,47.4) main> SensorReading(設備4,1610037154507,59.5) main> SensorReading(設備5,1610037235099,35.0) high> (設備6,76.4) main> SensorReading(設備2,1610037403367,10.0) main> SensorReading(設備2,1610037484177,18.5) high> (設備4,98.7) high> (設備5,95.6) main> SensorReading(設備6,1610037735520,32.6) high> (設備6,83.3) main> SensorReading(設備3,1610037913756,29.1) high> (設備7,74.6) main> SensorReading(設備6,1610038081606,22.2) main> SensorReading(設備3,1610038163043,10.4) main> SensorReading(設備5,1610038244717,56.9) main> SensorReading(設備3,1610038326227,64.8) main> SensorReading(設備4,1610038411053,65.0) high> (設備8,93.2) high> (設備8,76.2) main> SensorReading(設備1,1610038670150,42.1) main> SensorReading(設備5,1610038756839,35.1) high> (設備3,75.9) high> (設備3,83.4) main> SensorReading(設備7,1610039019422,24.1) high> (設備3,85.0) main> SensorReading(設備8,1610039183077,45.6) high> (設備3,79.5) main> SensorReading(設備1,1610039351600,44.4) high> (設備8,73.3) high> (設備3,77.9) low> (設備7,9.79) main> SensorReading(設備4,1610039679144,19.0) main> SensorReading(設備2,1610039761967,56.1) high> (設備3,88.2) high> (設備6,77.4) main> SensorReading(設備7,1610040014212,14.4) high> (設備4,98.2) high> (設備8,85.0) main> SensorReading(設備6,1610040265210,61.8) main> SensorReading(設備2,1610040345769,48.0) main> SensorReading(設備3,1610040432855,19.9) main> SensorReading(設備4,1610040515943,30.9) main> SensorReading(設備4,1610040601373,51.7) main> SensorReading(設備1,1610040681803,29.7) main> SensorReading(設備8,1610040770779,31.6) main> SensorReading(設備3,1610040851986,67.1) high> (設備4,93.2) main> SensorReading(設備7,1610041022836,37.2) high> (設備8,84.6) main> SensorReading(設備6,1610041189301,19.2) high> (設備4,99.0) high> (設備4,77.0) main> SensorReading(設備5,1610041435113,49.7) high> (設備1,74.2) main> SensorReading(設備8,1610041603035,42.2) high> (設備3,87.1) high> (設備1,82.7) low> (設備3,0.59) low> (設備4,7.38) main> SensorReading(設備2,1610042016080,28.9) high> (設備2,99.2) main> SensorReading(設備2,1610042190222,42.2) main> SensorReading(設備3,1610042277841,12.0) high> (設備7,93.5) main> SensorReading(設備7,1610042444652,10.5) main> SensorReading(設備4,1610042530461,68.5) high> (設備1,78.2) main> SensorReading(設備3,1610042702219,18.5) main> SensorReading(設備6,1610042787478,64.8) low> (設備5,6.34) main> SensorReading(設備2,1610042956073,65.6) main> SensorReading(設備8,1610043038793,10.6) main> SensorReading(設備8,1610043122971,30.3) main> SensorReading(設備7,1610043203810,17.5) high> (設備8,83.8) main> SensorReading(設備5,1610043373188,30.5) high> (設備2,84.7) main> SensorReading(設備1,1610043545998,53.4) high> (設備3,97.4) Process finished with exit code 0
感謝各位的閱讀,以上就是“Flink的SideOutputSplit分流怎么實現”的內容了,經過本文的學習后,相信大家對Flink的SideOutputSplit分流怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。