您好,登錄后才能下訂單哦!
這篇文章主要介紹Spark DataFrame寫入HBase的常用方式有哪些,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
Spark是目前最流行的分布式計算框架,而HBase則是在HDFS之上的列式分布式存儲引擎,基于Spark做離線或者實時計算,數據結果保存在HBase中是目前很流行的做法。例如用戶畫像、單品畫像、推薦系統等都可以用HBase作為存儲媒介,供客戶端使用。
因此Spark如何向HBase中寫數據就成為很重要的一個環節了。
代碼在spark 2.2.0版本親測
第一種是最簡單的使用方式了,就是基于RDD的分區,由于在spark中一個partition總是存儲在一個excutor上,因此可以創建一個HBase連接,提交整個partition的內容。
大致的代碼是:
rdd.foreachPartition { records => val config = HBaseConfiguration.create config.set("hbase.zookeeper.property.clientPort", "2181") config.set("hbase.zookeeper.quorum", "a1,a2,a3") val connection = ConnectionFactory.createConnection(config) val table = connection.getTable(TableName.valueOf("rec:user_rec")) val list = new java.util.ArrayList[Put] for(i <- 0 until 10){ val put = new Put(Bytes.toBytes(i.toString)) put.addColumn(Bytes.toBytes("t"), Bytes.toBytes("aaaa"), Bytes.toBytes("1111")) list.add(put) } table.put(list) table.close() }
這樣每次寫的代碼很多,顯得不夠友好,如果能跟dataframe保存parquet、csv之類的就好了。下面就看看怎么實現dataframe直接寫入hbase吧!
由于這個插件是hortonworks提供的,maven的中央倉庫并沒有直接可下載的版本。需要用戶下載源碼自己編譯打包,如果有maven私庫,可以上傳到自己的maven私庫里面。具體的步驟可以參考如下:
去官網github下載即可:https://github.com/hortonworks-spark/shc
可以直接按照下面的readme說明來,也可以跟著我的筆記走。
下載完成后,如果有自己的私庫,可以修改shc中的distributionManagement。然后點擊旁邊的maven插件deploy發布工程,如果只想打成jar包,那就直接install就可以了。
在pom.xml中引入:
<dependency> <groupId>com.hortonworks</groupId> <artifactId>shc-core</artifactId> <version>1.1.2-2.2-s_2.11-SNAPSHOT</version></dependency>
首先創建應用程序,Application.scala
object Application { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local").appName("normal").getOrCreate() spark.sparkContext.setLogLevel("warn") val data = (0 to 255).map { i => HBaseRecord(i, "extra")} val df:DataFrame = spark.createDataFrame(data) df.write .mode(SaveMode.Overwrite) .options(Map(HBaseTableCatalog.tableCatalog -> catalog)) .format("org.apache.spark.sql.execution.datasources.hbase") .save() } def catalog = s"""{ |"table":{"namespace":"rec", "name":"user_rec"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"string"}, |"col1":{"cf":"t", "col":"col1", "type":"boolean"}, |"col2":{"cf":"t", "col":"col2", "type":"double"}, |"col3":{"cf":"t", "col":"col3", "type":"float"}, |"col4":{"cf":"t", "col":"col4", "type":"int"}, |"col5":{"cf":"t", "col":"col5", "type":"bigint"}, |"col6":{"cf":"t", "col":"col6", "type":"smallint"}, |"col7":{"cf":"t", "col":"col7", "type":"string"}, |"col8":{"cf":"t", "col":"col8", "type":"tinyint"} |} |}""".stripMargin }case class HBaseRecord( col0: String, col1: Boolean, col2: Double, col3: Float, col4: Int, col5: Long, col6: Short, col7: String, col8: Byte) object HBaseRecord { def apply(i: Int, t: String): HBaseRecord = { val s = s"""row${"%03d".format(i)}""" HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort, s"String$i: $t", i.toByte) } }
然后再resources目錄下,添加hbase-site.xml、hdfs-site.xml、core-site.xml等配置文件。主要是獲取Hbase中的一些連接地址。
如果有瀏覽官網習慣的同學,一定會發現,HBase官網的版本已經到了3.0.0-SNAPSHOT,并且早就在2.0版本就增加了一個hbase-spark模塊,使用的方法跟上面hortonworks一樣,只是format的包名不同而已,猜想就是把hortonworks給拷貝過來了。
另外Hbase-spark 2.0.0-alpha4目前已經公開在maven倉庫中了。
http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark
不過,內部的spark版本是1.6.0,太陳舊了!!!!真心等不起了...
期待hbase-spark官方能快點提供正式版吧。
hortonworks-spark/shc github:https://github.com/hortonworks-spark/shc
maven倉庫地址: http://mvnrepository.com/artifact/org.apache.hbase/hbase-spark
Hbase spark sql/ dataframe官方文檔:https://hbase.apache.org/book.html#_sparksql_dataframes
以上是“Spark DataFrame寫入HBase的常用方式有哪些”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。