您好,登錄后才能下訂單哦!
背景介紹:
今天接到老板分配的一個小任務:開發一個程序,實現從數據庫中抽取數據并生成報表(這是我們數據庫審計平臺準備上線的一個功能)。既然是要生成報表,那么首先得有數據,于是便想到從該業務系統的測試環境抽取業務表的數據,然后裝載至自己云主機上的Mysql中。
本來以為只要"select ...into outfile"和"load data infile..."兩個命令就可以搞定的,可是還是出了意外。測試環境導出的
txt文件在云主機load時,報了"Row 1 doesn't contain data for all columns"這樣的warning,表中的數據自然也是凌亂且不完整的。
仔細分析,感覺可能是兩個方面出了問題:
1.由于測試環境的網段是隔離的,所以為了拿到"select ...into outfile"時生成的數據,我是打開CRT的日志,然后執行
"cat xxx.txt",變相地將數據獲取到了本地,然后上傳至云主機的;
2.測試環境的Mysql和云主機上Mysql的小版本不一致。
這兩個問題看似都沒法解決,現在只有文本文件,怎么辦?使用Spark不就得了!
之前也寫過一篇使用Spark分析Mysql慢日志的博文,自己對Spark core的各種算子比較熟悉,所以決定試一試。
實戰演練:
表結構如下:
mysql> desc claims_case_loss_document;
+---------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| case_id | varchar(22) | NO | | NULL | |
| case_times | varchar(2) | NO | | NULL | |
| document_list | text | NO | | NULL | |
| create_time | timestamp | YES | | NULL | |
| update_time | timestamp | YES | | NULL | |
+---------------+-------------+------+-----+---------+----------------+
6 rows in set (0.00 sec)
文本結構如下:
1147 90100002700021437455 1 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書 2017-11-16 12:08:08 2017-11-16 12:08:08
觀察文本結構可知,每個字段間都有數個空格,而且兩兩字段間的空格數并不一致,所以得先使用Spark core將文本中字段提取出來,以便后續插入。
閑話少說,直接上程序!(以下程序均使用scala在eclipse ide for scala中編寫和執行)
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.collection.mutable.ArrayBuffer
import java.sql.DriverManager
object insert2Mysql {
def main(args: Array[String]): Unit = {
val t1=System.nanoTime()
val conf = new SparkConf()
.setAppName("insert2Mysql")
.setMaster("local")
val sc = new SparkContext(conf)
//textFile方法只能讀取字符集為utf-8的文件,否則中文會亂碼。windows下,將文件另存為時,可以選擇utf-8字符集
//也可在代碼中實施轉換,但比較繁瑣
val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text001.txt", 1);
val words = lines.map { line => line.split(" ") }
val wordsNotNull = words.map{ word =>
val wordArray_raw = new ArrayBuffer[String]()
val wordArray = new ArrayBuffer[String]()
for(i<-0 until word.length){
if (word(i)!=""){
wordArray_raw+=word(i)
}
}
for(i<-0 until wordArray_raw.length-4){
wordArray+=wordArray_raw(i)
}
wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
wordArray
}
wordsNotNull.foreach { word =>
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
try {
val statement = conn.createStatement()
val sql="insert into claims_case_loss_document values ("+
word(0)+","+
"'"+word(1)+"'"+","+
"'"+word(2)+"'"+","+
"'"+word(3)+"'"+","+
"'"+word(4)+"'"+","+
"'"+word(5)+"'"+")"
//執行插入
//println(sql)
statement.executeUpdate(sql)
} catch{
case e:Exception =>e.printStackTrace
}
finally {
conn.close
}
}
val t2=System.nanoTime()
//打印程序運行時間
println((t2-t1)/1000000000 +"s")
}
}
在插入的過程中,第一條記錄總是會報錯(后續語句插入正常),將eclipse中打印出的報錯的insert語句手工粘貼至mysql執行時,仍報相同錯誤:
從報錯看是遇到了bug,并且1147這個值有問題,將相鄰語句放入Notepad對比:
從圖中可看出,1147的千位上的1確實發生了異常改變,而第二條語句中的1148是正常的,猜測可能是某個未知bug導致了第一條記錄發生了異常改變。這個猜測在后續得到了證實:當把1147所在行從文本中刪除后(此時1148所在行為第一條記錄),1148所在行也報出同樣的錯誤,而后續語句均可正常插入。
由于數據是作分析用的,所以丟失一條無傷大雅,而且這個bug實在詭異,這里就不再深究了。
細心的童鞋在看了代碼后應該會問:數據插入的效率如何?實不相瞞,效率很差!5000條的數據足足用了近半個小時,即使是在這樣的OLAP場景下,這樣的效率也是不可容忍的!
仔細研究代碼可發現,在對RDD調用foreach方法進行插入的時候,每一條記錄都要創建一個連接,并且每一次insert都會在Mysql中觸發一次commit操作(autocommit參數默認是打開的),這些都是很消耗資源的操作,插入效率自然很差。
發現這些問題后,針對代碼進行了修改:
package cn.spark.study.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.DriverManager
import scala.collection.mutable.ArrayBuffer
object insert2Mysql {
def main(args: Array[String]): Unit = {
val t1=System.nanoTime()
val conf = new SparkConf()
.setAppName("insert2Mysql")
.setMaster("local")
val sc = new SparkContext(conf)
//textFile方法只能讀取字符集為utf-8的文件,否則中文會亂碼。windows下,將文件另存為時,可以選擇utf-8字符集
//也可在代碼中實施轉換,但比較繁瑣
val lines = sc.textFile("D://Users//GAOZHONGZHENG186//Desktop//text01.txt", 1);
val words = lines.map { line => line.split(" ") }
val wordsNotNull = words.map{ word =>
val wordArray_raw = new ArrayBuffer[String]()
val wordArray = new ArrayBuffer[String]()
for(i<-0 until word.length){
if (word(i)!=""){
wordArray_raw+=word(i)
}
}
for(i<-0 until wordArray_raw.length-4){
wordArray+=wordArray_raw(i)
}
wordArray+=wordArray_raw(4)+" "+wordArray_raw(5)
wordArray+=wordArray_raw(6)+" "+wordArray_raw(7)
wordArray
}
val sqlRDD=wordsNotNull.map{ word =>
val sql="insert into claims_case_loss_document values ("+
word(0)+","+
"'"+word(1)+"'"+","+
"'"+word(2)+"'"+","+
"'"+word(3)+"'"+","+
"'"+word(4)+"'"+","+
"'"+word(5)+"'"+")"
sql
}
val sqlArray=sqlRDD.toArray()
//加載驅動
Class.forName("com.mysql.cj.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://10.25.80.7:3306/db1", "root", "123456")
try {
conn.setAutoCommit(false)
val statement = conn.createStatement()
//這里有bug,處理出來的第一行格式都會報ERROR 1054 (42S22): Unknown column '?1147' in 'field list'
//為了避免程序跳出循環,所以循環從1開始,即從第2條開始插入
for(i<-1 until sqlArray.length){
//執行插入
println(sqlArray(i))
statement.executeUpdate(sqlArray(i))
}
conn.commit()
}
catch{
case e:Exception =>e.printStackTrace
}
finally{
conn.close
}
val t2=System.nanoTime()
println((t2-t1)/1000000000 +"s")
}
}
修改后的代碼規避了上述缺陷,在同樣插入5000條數據的情況下,只用了221s!效率大大提升!
到Mysql驗證數據:
mysql> select count(*) from claims_case_loss_document;
+----------+
| count(*) |
+----------+
| 4999 | --插入時跳過了第一條,所以為4999條
+----------+
1 row in set (0.00 sec)
mysql> select * from claims_case_loss_document limit 1\G
*************************** 1. row ***************************
id: 1148
case_id: 90100002700021437450
case_times: 1
document_list: 100100_收款方賬戶信息;001003_事故證明;001001_駕駛證;100000_收款方×××明;001002_索賠申請書
create_time: 2017-11-16 12:08:08
update_time: 2017-11-16 12:08:08
1 row in set (0.00 sec)
至此,問題圓滿解決!整個過程和數據倉庫領域的ETL很接近,抽取-轉換-裝載,三個環節都有涉及,只是沒有使用
kettle之類的工具罷了。
總結:
在大數據時代,DBA應該積極做出改變,掌握一定開發技能,以便更好地適應時代變化,切不可固守自己的一畝三分地!
最后,給我們上海分組自研的數據庫審計平臺打個廣告 ^.^
數據庫審計平臺是我們分組歷時兩年打造的產品,可用于Mysql、Oracle、Postgres等多種數據庫,具備以下核心工能:
1.審計違規sql,前端一鍵生成報告
2.對相同功能點的sql可實現自動歸類,方便后續統一整改
3.內嵌Percona toolkit,前端一鍵調用
4.一鍵抓取低效sql,并自動給出優化建議
還有很多很酷的功能就不一一介紹了,總之,誰用誰說好!感興趣的DBA童鞋可以留言,可免費試用哦!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。