您好,登錄后才能下訂單哦!
本篇內容介紹了“大數據開發中數據表監控怎么實現”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
大數據開發-表數據波動、碼值分布波動監控 && 報警,是關于理論和設計部分,初步計算已經寫完,管理平臺部分,后續完善,本文主要針對模塊設計部分,整體模塊實現上是離線數據源的異步模塊,分為指標跑批模塊,監控報警模塊,平臺管理模塊,指標跑批模塊和監控報警模塊主要是基于離線數據的表的通訊,即不是耦合的架構,分別是兩個例行的任務,看下面的圖,
其中表設計了五張表,分別是指標跑批記錄表,報警跑批記錄表,和mysql表監控配置表,報警配置表,分布字段配置表,指標跑批和監控跑批不直接任務依賴,而是通過指標跑批記錄表,直接產生報警跑批記錄
整個項目目錄結構如下圖:
resource: 配置文件
common: 一些公共的模塊,Builder是發送消息構建器,Father是Spark項目的公共代碼
rules: 下面有5個指標的規則,分別是檢查分區是否存儲,檢查分區數量是否大于某個值,檢查分區數量波動,檢查分布,檢查分布大向量波動
utils: 里面放的是一些工具,比如日期處理工具類,表格式處理工具,sql處理工具等
Monitor: 指標跑批的主類
SunRobot: 報警跑批的主類入庫
rule的一些實現,不細說了,根據源代碼很好看懂,而Monitor是怎么根據這些規則,生成對應的流水,主要實現代碼如下:
package com.hoult import com.beust.jcommander.JCommander import com.hoult.common.Father import com.hoult.rules.{Rule1, Rule2, Rule3, Rule4, Rule5, TableMonitorConf, TableMonitorRecord} import com.hoult.utils.{DateTool, PropertiesUtils} import org.apache.spark.sql.Dataset import scala.collection.mutable.ListBuffer object Monitor extends Father { val mysqlProps = PropertiesUtils.getMysqlProps() var broadTableConfs: Dataset[TableMonitorConf] = null def main(args: Array[String]): Unit = { val info: ObserverArgs = new ObserverArgs println("入參:: " + args.mkString(",")) JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*) //廣播配置表 prepare() //生成表 * 規則 個 dataframe import spark.implicits._ val tableConfArray: Array[TableMonitorConf] = spark.sql("select * from table_monitor_conf where db_table_name !='default.default'").as[TableMonitorConf].collect() val defaultTableConf = spark.sql("select * from table_monitor_conf where db_table_name ='default.default'").as[TableMonitorConf].collect().take(1)(0) var ll: ListBuffer[Dataset[TableMonitorRecord]] = ListBuffer[Dataset[TableMonitorRecord]]() //所有規則一起跑 //默認值填充 val tConfs = tableConfArray.map( conf => { TableMonitorConf( if(conf.db_table_key == null) defaultTableConf.db_table_key else conf.db_table_key, conf.db_table_name, if (conf.table_charge_people == null) defaultTableConf.table_charge_people else conf.table_charge_people, if (conf.done_path == null) defaultTableConf.done_path else conf.done_path, if (conf.where_condition == null) defaultTableConf.where_condition else conf.where_condition, if (conf.if_done == null) defaultTableConf.if_done else conf.if_done, if (conf.if_check_partition == null) defaultTableConf.if_check_partition else conf.if_check_partition, if (conf.if_check_partition_count == null) defaultTableConf.if_check_partition_count else conf.if_check_partition_count, if (conf.if_check_partition_count_fluctuates == null) defaultTableConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates, if (conf.if_check_distribute == null) defaultTableConf.if_check_distribute else conf.if_check_distribute, if (conf.if_check_distribute_fluctuates == null) defaultTableConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates )}) //遍歷所有規則 for (elem <- tConfs) { //規則1 if ("1".equals(elem.if_check_partition)) { ll +:= Rule1.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規則2 if ("1".equals(elem.if_check_partition_count)) { ll +:= Rule2.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規則3 if ("1".equals(elem.if_check_partition_count_fluctuates)) { ll +:= Rule3.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規則4 if ("1".equals(elem.if_check_distribute)) { ll +:= Rule4.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //規則5 if ("1".equals(elem.if_check_distribute_fluctuates)) { ll +:= Rule5.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } } if (ll.size == 0) return ll.reduce(_.union(_)).select( "db_table_key", "db_table_name", "check_data_time", "rule_name", "rule_result", "rule_error", "checked_partition" ).createOrReplaceTempView("temp_table_rule_records") val partition = DateTool.getLatest30minutePatition spark.sql("set hive.reduce.tasks=1") spark.sql(s"insert overwrite table table_monitor.table_rule_records partition(dt=${info.runDay},hm=$partition) select * from temp_table_rule_records") } def prepare(): Unit = { import spark.implicits._ //1.基礎配置表緩存到集群 table_monitor_conf val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache() tableConfs.createOrReplaceTempView("table_monitor_conf") //2.配置表緩存到集群 table_monitor_distribute_conf spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_distribute_conf", mysqlProps).cache().createOrReplaceTempView("table_monitor_distribute_conf") } }
整理流程就是讀配置表的信息,包括設置默認參數,最后就是調用拿出來的配置表的信息和規則的信息
這個模塊就是報警以及生成攔截done文件的模塊,主要功能就是根據前面的指標流水以及配置表的配置的表元信息進行比對,看看是否滿足check要求,如果滿足了就生成done,如果不滿足就生成undone, 如果配置了報警,就會根據報警規則進行報警,暫且只有發送到通訊工具一項,主要代碼如下:
import com.beust.jcommander.JCommander import com.hoult.common.{Father, Message, RobotConf} import com.hoult.rules.TableMonitorConf import com.hoult.utils.{DgsFileSystem, PropertiesUtils} import org.apache.hadoop.fs.Path import org.apache.spark.sql.Dataset import org.joda.time.DateTime import scalaj.http.Http /** * 根據rule跑批結果 和 報警配置進行報警 */ object SunRobot extends Father { import spark.implicits._ val mysqlProps = PropertiesUtils.getMysqlProps() def main(args: Array[String]): Unit = { val info: ObserverArgs = new ObserverArgs println("入參:: " + args.mkString(",")) JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*) //1.基礎配置表緩存到集群 table_monitor_conf val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache() //2.默認規則提取-->driver val defaultConf: TableMonitorConf = tableConfs.where("db_table_name='default.default'").as[TableMonitorConf].take(1)(0) //3.配置表緩存到集群 table_monitor_notify_conf import org.apache.spark.sql.functions.broadcast val tableNotifyConf = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_notify_conf", mysqlProps).as[TableNotifyConf].cache() broadcast(tableNotifyConf).createOrReplaceTempView("table_monitor_notify_conf") import spark.implicits._ //從配置表撈取需要的信息 and 默認值填充 val tConfs = tableConfs.map( conf => { TableMonitorConf( if (conf.db_table_key == null) defaultConf.db_table_key else conf.db_table_key, conf.db_table_name, if (conf.table_charge_people == null) defaultConf.table_charge_people else conf.table_charge_people, if (conf.done_path == null) defaultConf.done_path else conf.done_path, if (conf.where_condition == null) defaultConf.where_condition else conf.where_condition, if (conf.if_done == null) defaultConf.if_done else conf.if_done, if (conf.if_check_partition == null) defaultConf.if_check_partition else conf.if_check_partition, if (conf.if_check_partition_count == null) defaultConf.if_check_partition_count else conf.if_check_partition_count, if (conf.if_check_partition_count_fluctuates == null) defaultConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates, if (conf.if_check_distribute == null) defaultConf.if_check_distribute else conf.if_check_distribute, if (conf.if_check_distribute_fluctuates == null) defaultConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates )}) broadcast(tConfs).createOrReplaceTempView("table_monitor_conf") val confUnions: Dataset[ConfUnion] = spark.sql( """ |SELECT | a.db_table_key, | a.db_table_name, | notify_enable, | check_count_threshold, | normal_produce_datetime, | check_distribute_json_threshold, | check_count_fluctuates_threshold, | b.if_done, | b.done_path, | b.table_charge_people |FROM table_monitor_notify_conf a |LEFT JOIN table_monitor_conf b on a.db_table_name = b.db_table_name and a.db_table_key = b.db_table_key |where notify_enable='1' |""".stripMargin).as[ConfUnion] excute(confUnions, info.runDay) } def excute(confUnions: Dataset[ConfUnion], runDay: String): Unit = { val record = spark.sql(s""" |SELECT db_table_key, | db_table_name, | check_data_time, | rule_name, | rule_result, | checked_partition, | rule_error |FROM | (SELECT db_table_key, | db_table_name, | check_data_time, | rule_name, | rule_result, | rule_error, | checked_partition, | row_number() over(partition by db_table_key, db_table_name, rule_name | order by check_data_time desc) rn | FROM table_monitor.table_rule_records | WHERE dt='$runDay' ) tmp | WHERE rn = 1 |""".stripMargin) record .join(confUnions, Seq("db_table_name","db_table_key"), "left") .filter("notify_enable='1'") .createOrReplaceTempView("tmp_records") val now = DateTime.now().toString("yyyy-MM-dd HH:mm:ss")
val result = spark.sql("") // done文件 val notifyRecords = result .as[NotifyRecord] .collect() .map(r => NotifyRecord( r.db_table_name, r.view_url, r.db_table_key, r.rule_name, r.if_ok, if (r.db_table_key != null) s"${r.done_path}/${r.db_table_name}/${r.db_table_key}/" else s"${r.done_path}/${r.db_table_name}/default/", r.table_charge_people, r.trouble_description, r.check_data_time, r.checked_partition )) sc.makeRDD(notifyRecords).toDS().createOrReplaceTempView("tmp_notify_records") val dgs = DgsFileSystem.getFileSystem //寫日志記錄 spark.sql("set hive.reduce.tasks = 1") spark.sql(s"insert overwrite table table_monitor.table_monitor_notify_records partition(dt=${runDay}) select * from tmp_notify_records") //取是否生成done 或者 undone文件 (針對只監控超時產出的表,不需要做依賴,就不需要生成done依賴文件) val ifDoneMap = confUnions .selectExpr("concat(db_table_key, db_table_name) AS key_name", "if_done") .rdd .map(row => row.getAs("key_name").toString -> row.getAs("if_done").toString) .collectAsMap() //1.所有的寫done或者非done for (elem <- notifyRecords) { if (ifDoneMap.getOrElse(elem.db_table_key + elem.db_table_name, "0").equals("1")) { if ("0".equals(elem.if_ok)) { dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone")) } else { dgs.deleteOnExit(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.undone")) dgs.createNewFile(new Path(s"${elem.done_path}$runDay/${elem.rule_name}.done")) } } } //2.有問題的才報警 val notifyRecordsTmp = notifyRecords.filter(_.trouble_description != null) //2.1 取超時時間 val normalTimeMap = spark.sql( s""" |SELECT concat(db_table_key, db_table_name) AS key_name, | normal_produce_datetime |FROM table_monitor_notify_conf |where normal_produce_datetime is not null |GROUP BY db_table_key, | db_table_name, | normal_produce_datetime |""".stripMargin) .rdd .map(row => row.getAs("key_name").toString -> row.getAs("normal_produce_datetime").toString) .collectAsMap() for (elem <- notifyRecordsTmp) { //未生成且大于超時時間才報警 if ("0".equals(elem.if_ok) && normalTimeMap.getOrElse(elem.db_table_key + elem.db_table_name, "08:00").compareTo(elem.check_data_time.substring(11, 16)) < 0) { resp(Message.build(RobotConf.GROUP_ID, elem.table_charge_people, s""" |【表監控報警-${elem.db_table_key}/${elem.db_table_name}】 |分區: ${elem.checked_partition} |檢查時間:${elem.check_data_time} |規則:${elem.rule_name} |問題:${elem.trouble_description} |詳情:${elem.view_url}""".stripMargin)) } } // println("報警:" + Message.build(RobotConf.GROUP_ID, elem.table_charge_people, elem.db_table_name + "\n" + elem.trouble_description + "\n" + elem.view_url)) // println("done:" + elem) // println("寫日志:" + elem) } def resp(msg: String) = { Http(RobotConf.API_URL) .header("Content-Type", "application/json") .postData(msg).asBytes } case class NotifyRecord( db_table_name: String, view_url: String, db_table_key: String, rule_name: String, if_ok: String, done_path: String, table_charge_people: String, trouble_description: String, check_data_time: String, checked_partition: String ) case class TableNotifyConf( var db_table_key: String, var db_table_name: String, var notify_enable: String, var check_count_threshold: String, var normal_produce_datetime: String, var check_distribute_json_threshold: String, var check_count_fluctuates_threshold: String ) case class ConfUnion( var db_table_key: String, var db_table_name: String, var notify_enable: String, var check_count_threshold: String, var normal_produce_datetime: String, var check_distribute_json_threshold: String, var check_count_fluctuates_threshold: String, var if_done: String, var done_path: String, var table_charge_people: String ) }
“大數據開發中數據表監控怎么實現”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。