中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

第97課:Spark Streaming 結合Spark SQL 案例

發布時間:2020-08-11 00:10:12 來源:網絡 閱讀:4345 作者:lqding1980 欄目:大數據

代碼如下:

package com.dt.spark.streaming

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.streaming.{StreamingContext, Duration}

/**
 * 使用SparkStreaming結合SparkSQL對日志進行分析。
 * 假設電商網站點擊日志格式(簡化)如下:
 * userid,itemId,clickTime
 * 需求:處理10分鐘內item點擊次數排序Top10,并且將商品名稱顯示出來。商品itemId與商品名稱的對應關系存放在MySQL數據庫中
 * Created by dinglq on 2016/5/4.
 */
object LogAnalyzerStreamingSQL {
  val WINDOW_LENGTH = new Duration(600 * 1000)
  val SLIDE_INTERVAL = new Duration(10 * 1000)

  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("LogAnalyzerStreamingSQL").setMaster("local[4]")

    val sc = new SparkContext(sparkConf)

    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    //從數據庫中加載itemInfo表
    val itemInfoDF = sqlContext.read.format("jdbc").options(Map(
      "url"-> "jdbc:mysql://spark-master:3306/spark",
      "driver"->"com.mysql.jdbc.Driver",
      "dbtable"->"iteminfo",
      "user"->"root",
      "password"-> "vincent"
      )).load()

    itemInfoDF.registerTempTable("itemInfo")

    val streamingContext = new StreamingContext(sc, SLIDE_INTERVAL)

    val logLinesDStream = streamingContext.textFileStream("D:/logs_incoming")

    val accessLogsDStream = logLinesDStream.map(AccessLog.parseLogLine).cache()

    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)

    windowDStream.foreachRDD(accessLogs => {
      if (accessLogs.isEmpty()) {
        println("No logs received in this time interval")
      } else {
        accessLogs.toDF().registerTempTable("accessLogs")
        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM itemInfo a JOIN " +
          " (SELECT itemId,COUNT(*) cnt FROM accessLogs GROUP BY itemId) b " +
          " ON (a.itemid=b.itemId) ORDER BY cnt DESC LIMIT 10 "
        val topTenClickItemLast10Minus = sqlContext.sql(sqlStr)

        // Persist top ten table for this window to HDFS as parquet file

        topTenClickItemLast10Minus.show()
      }
    })

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

case class AccessLog(userId: String, itemId: String, clickTime: String) {
}

object AccessLog {

  def parseLogLine(log: String): AccessLog = {
    val logInfo = log.split(",")
    if (logInfo.length == 3) {
      AccessLog(logInfo(0),logInfo(1), logInfo(2))
    }
    else {
      AccessLog("0","0","0")
    }
  }
}


MySQL中表的內容如下:

mysql> select * from spark.iteminfo;
+--------+----------+
| itemid | itemname |
+--------+----------+
| 001    | phone    |
| 002    | computer |
| 003    | TV       |
+--------+----------+
3 rows in set (0.00 sec)


在D創建目錄logs_incoming


運行Spark Streaming 程序。


新建文件,內容如下:

0001,001,2016-05-04 22:10:20
0002,001,2016-05-04 22:10:21
0003,001,2016-05-04 22:10:22
0004,002,2016-05-04 22:10:23
0005,002,2016-05-04 22:10:24
0006,001,2016-05-04 22:10:25
0007,002,2016-05-04 22:10:26
0008,001,2016-05-04 22:10:27
0009,003,2016-05-04 22:10:28
0010,003,2016-05-04 22:10:29
0011,001,2016-05-04 22:10:30
0012,003,2016-05-04 22:10:31
0013,003,2016-05-04 22:10:32

將文件保存到目錄logs_incoming 中,觀察Spark程序的輸出:

+------+--------+---+
|itemid|itemname|cnt|
+------+--------+---+
|   001|   phone|  6|
|   003|      TV|  4|
|   002|computer|  3|
+------+--------+---+



備注:

1、DT大數據夢工廠微信公眾號DT_Spark 
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巢湖市| 郑州市| 石家庄市| 黄骅市| 牡丹江市| 永济市| 水城县| 江都市| 集安市| 巴塘县| 黑河市| 平武县| 柳林县| 汉川市| 弥勒县| 潞城市| 保康县| 临西县| 福鼎市| 中牟县| 阳西县| 东兰县| 凤翔县| 澄城县| 龙陵县| 新野县| 南康市| 宜昌市| 永和县| 海原县| 腾冲县| 临沭县| 高台县| 灌云县| 黄大仙区| 临朐县| 明光市| 镇坪县| 阳信县| 吴堡县| 峨山|