中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

大數據SparkSQl指的是什么呢

發布時間:2021-12-17 14:19:57 來源:億速云 閱讀:202 作者:柒染 欄目:大數據

這期內容當中小編將會給大家帶來有關大數據SparkSQl指的是什么呢,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame并且作為分布式SQL查詢引擎的作用。SparkSql中返回的數據類型是DataFrame

1.1.1.   為什么要學習Spark SQL

我們已經學習了Hive,它是將Hive SQL轉換成MapReduce然后提交到集群上執行,大大簡化了編寫MapReduce的程序的復雜性,由于MapReduce這種計算模型執行效率比較慢。所有Spark SQL的應運而生,它是將Spark SQL轉換成RDD,然后提交到集群執行,執行效率非常快!

HIVE:簡化編寫MapReduce的程序的復雜性

Spark SQL轉換成RDD:替代MapReduce,提高效率

Spark1.0版本開始就推出了SparkSQL,最早是叫Shark

1、內存列存儲--可以大大優化內存使用效率,減少了內存消耗,避免了gc對大量數據的性能開銷

2、字節碼生成技術(byte-code generation)--可以使用動態字節碼生成技術來優化性能

3、Scala代碼的優化

  結構化數據是指任何有結構信息的數據。所謂結構信息,就是每條記錄共用的已知的字段集合。當數據符合 這樣的條件時,Spark SQL 就會使得針對這些數據的讀取和查詢變得更加簡單高效。具體 來說,Spark SQL 提供了以下三大功能(見圖 9-1)。

(1) Spark SQL 可以從各種結構化數據源(例如 JSON、Hive、Parquet 等)中讀取數據。

(2) Spark SQL 不僅支持在 Spark 程序內使用 SQL 語句進行數據查詢,也支持從類似商業 智能軟件 Tableau 這樣的外部工具中通過標準數據庫連接器(JDBC/ODBC)連接 Spark SQL 進行查詢。

(3) 當在 Spark 程序內使用 Spark SQL 時,Spark SQL 支持 SQL 與常規的 Python/Java/Scala 代碼高度整合,包括連接 RDD 與 SQL 表、公開的自定義 SQL 函數接口等。這樣一來, 許多工作都更容易實現了。

為了實現這些功能,Spark SQL 提供了一種特殊的 RDD,叫作 SchemaRDD。SchemaRDD 是存放 Row 對象的 RDD,每個 Row 對象代表一行記錄。SchemaRDD 還包含記錄的結構信 息(即數據字段)。SchemaRDD 看起來和普通的 RDD 很像,但是在內部,SchemaRDD 可 以利用結構信息更加高效地存儲數據。此外,SchemaRDD 還支持 RDD 上所沒有的一些新 操作,比如運行 SQL 查詢。SchemaRDD 可以從外部數據源創建,也可以從查詢結果或普 通 RDD 中創建。

什么是DataFrames

(SparkSql中返回的數據類型它在概念上等同于關系數據庫中的表,但在查詢上進行了優化)

與RDD類似,DataFrame也是一個分布式數據容器。然而DataFrame更像傳統數據庫的二維表格,除了數據以外,還記錄數據的結構信息,即schema。

1.1.1.   創建DataFrames

 在Spark SQL中SQLContext是創建DataFrames和執行SQL的入口,在spark-1.6.1中已經內置了一個sqlContext

1.在本地創建一個文件,有三列,分別是id、name、age,用空格分隔,然后上傳到hdfs上

hdfs dfs -put person.txt /

2.在spark shell執行下面命令,讀取數據,將每一行的數據使用列分隔符分割

val lineRDD = sc.textFile("hdfs://node01:9000/person.txt").map(_.split(" "))

3.定義case class(相當于表的schema

case class Person(id:Int, name:String, age:Int)

4.RDDcase class關聯

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

  (里面的數據是在Array中)

5.RDD轉換成DataFrame

val personDF = personRDD.toDF

6.對DataFrame進行處理

personDF.show

val seq1 = Seq(("1","bingbing",35),("2","yuanyuan",34),("3","mimi",33))

val rdd1 =sc.parallelize(seq1)

val df = rdd1.toDF("id","name","age")

df.show

DSL:領域特定語言

////查看DataFrame中的內容

大數據SparkSQl指的是什么呢

//查看DataFrame部分列中的內容

1.

大數據SparkSQl指的是什么呢

2.

大數據SparkSQl指的是什么呢

3.

大數據SparkSQl指的是什么呢

//打印DataFrame的Schema信息

大數據SparkSQl指的是什么呢

//查詢所有的name和age,并將age+1

1.df.select(col("id"),col("name"),col("age")+1).show

大數據SparkSQl指的是什么呢

2.df.select(df("id"), df("name"), df("age") + 1).show

大數據SparkSQl指的是什么呢

//過濾age大于等于18的

df.filter(col("age") >= 35).show

大數據SparkSQl指的是什么呢

//按年齡進行分組并統計相同年齡的人數

df.groupBy("age").count().show()

大數據SparkSQl指的是什么呢

SQL風格語法

//查詢年齡最大的前兩名

1.如果想使用SQL風格的語法,需要將DataFrame注冊成表

df.registerTempTable("t_person")

2.sqlContext.sql("select * from t_person order by age desc limit 2").show

大數據SparkSQl指的是什么呢

//顯示表的Schema信息

大數據SparkSQl指的是什么呢

以編程方式執行Spark SQL查詢

1.編寫Spark SQL查詢程序

1.通過反射推斷Schema

=======================================================

package com.qf.gp1708.day06

//通過反射獲取用戶信息

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SQLContext}

import org.apache.spark.{SparkConf, SparkContext}

object InferSchema {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()

      .setMaster("local")

      .setAppName("inferschema")

    val sc = new SparkContext(conf)

    val sqlContext:SQLContext = new SQLContext(sc)

  1.  //獲取數據并切分

    val line = sc.textFile("C://Users/Song/Desktop/person.txt").map(_.split(","))

   3 //將獲取的數據和Person樣例類進行關聯

    val personRdd: RDD[Godness] = line.map(arr=>Godness(arr(0).toLong,arr(1),arr(2).toInt,arr(3).toInt))

    //引入隱式轉換函數,這樣才可以調用到toDF方法

    import sqlContext.implicits._

   4 //將personRDD轉換成DataFrame

    val dF: DataFrame = personRdd.toDF

  5.  //注冊一張臨時表

    dF.registerTempTable("t_person")

    val sql = "select * from t_person where fv > 70 order by age"

    //查詢

    val res: DataFrame = sqlContext.sql(sql)

    res.show()

    sc.stop()

  }

}

2//創建樣例類

case class Godness(id:Long,name:String,age:Int,fv:Int)

=========================================================

2.通過StructType直接指定Schema

===========================================

package com.qf.gp1708.day06

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/**

  * 通過StructType類型直接指定Schema

  */

object StructTypeSchema {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()

      .setAppName("str")

      .setMaster("local")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    //獲取數據并切分

    val lines = sc.textFile("hdfs://...").map(_.split(","))

    //指定schema信息

    StructType{

      List(

        StructField("id",IntegerType,false),

        StructField("name",StringType,true),

        StructField("age",IntegerType,true),

        StructField("fv",IntegerType,true),

      )

    }

    //開始映射

    val rowRDD: RDD[Row] = lines.map(arr =>Row(arr(0).toInt,arr(1),arr(2).toInt,arr(3).toInt))

    //把RDD轉換為DataFrame

    val personDF: DataFrame = sqlContext.createDataFrame(rowRDD,schema)

    //生成臨時表

    personDF.registerTempTable("t_person")

    val sql = "select name,age,fv from t_person where age >30 order by age desc"

    val res = sqlContext.sql(sql)

    res.write.mode("append").json("c://out-20180903-1")

    sc.stop()

  }

}

=================================================================

1.   數據源

1.1. JDBC

Spark SQL可以通過JDBC從關系型數據庫中讀取數據的方式創建DataFrame,通過對DataFrame一系列的計算后,還可以將數據再寫回關系型數據庫中。

1.1.1.   MySQL中加載數據(Spark Shell方式)

1.啟動Spark Shell,必須指定mysql連接驅動jar包

/usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-shell \

--master spark://node01:7077 \

--jars /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar \

  (指定MySQL包)

--driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar (指定驅動類)

2.從mysql中加載數據

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://node03:3306/bigdata", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "person", "user" -> "root", "password" -> "root")).load()

3.執行查詢

jdbcDF.show()

1.1.2.   將數據寫入到MySQL中(打jar包方式)

package com.qf.gp1708.day06

import java.util.Properties

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/**

  * 寫入數據到MySQL

  */

object InsertData2MySQLDemo {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("").setMaster("local[2]")

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val lines= sc.textFile("").map(_.split(","))

    //生成Schema

    val schema = StructType {

      Array(

        StructField("name", StringType, true),

        StructField("age", IntegerType, true),

        StructField("fv", StringType, true),

      )

    }

    //映射

    val personRDD = lines.map(arr =>Row(arr(1).toString,arr(2).toInt,arr(3).toInt))

    //生成DataFrame

    val personDF = sqlContext.createDataFrame(personRDD,schema)

    //生成用于寫入MySQL的配置信息

    val prop = new Properties()

    prop.put("user","root")

    prop.put("password","root")

    prop.put("driver","com.mysql.jdbc.Driver")

    val jdbcUrl="jdbc:mysql://hadoop03:3306/bigdata"

    val table="person"

    //把數據寫入MySQL

    personDF.write.mode("append").jdbc(jdbcUrl,table,prop)

    sc.stop()

  }

}

/usr/local/spark-1.6.3-bin-hadoop2.6/spark-submit \

--class com.qf..... \

--master spark://hadoop01:7077 \

--executor-memory 512m \

--total-executor-cores 2 \

--jars /usr/.../mysql-connector-java-5.1.35-bin.jar \

--driver-class-path /usr/.../mysql-connector-java-5.1.35-bin.jar \

/root/1.jar

=======================================================

kafka:消息中間件(緩存數據)---解耦

  為處理實時數據提供一個統一、高吞吐量、低等待的平臺

  3、為什么需要消息隊列(重要、了解)

  消息系統的核心作用就是三點:解耦,異步和并行

  Kafka對消息保存時根據Topic進行歸類

  Topic:底層就是隊列,將不同的消息放在不同的隊列中進行分類

上述就是小編為大家分享的大數據SparkSQl指的是什么呢了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巫溪县| 阜康市| 辉县市| 庆安县| 西乌| 昔阳县| 金川县| 海晏县| 城市| 青海省| 绵阳市| 阳江市| 德庆县| 敦化市| 永修县| 马鞍山市| 社会| 阳春市| 秦安县| 文登市| 镇雄县| 虎林市| 靖西县| 南投市| 云梦县| 昭觉县| 买车| 察隅县| 留坝县| 乾安县| 上杭县| 永丰县| 枣庄市| 东辽县| 凉城县| 墨玉县| 江油市| 龙海市| 吕梁市| 元氏县| 白水县|