中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spark ALS實現的步驟是什么

發布時間:2021-10-28 09:18:20 來源:億速云 閱讀:180 作者:iii 欄目:云計算

這篇文章主要講解了“Spark ALS實現的步驟是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Spark ALS實現的步驟是什么”吧!

spark ALS算法是做個性推薦用的,它所需要的數據集是類似用戶對商品的打分表之類的數據集。實現步驟主要以下幾步:

1、定義輸入數據

2、輸入數據轉換成評分數據格式,如case class Rating(user: Int, movie: Int, rating: Float)

3、設計ALS模型訓練數據

4、計算推薦數據,存儲起來供業務系統直接使用。

下面看看具體的代碼:

package recommend
import org.apache.spark.sql.SparkSession
import java.util.Properties
import org.apache.spark.rdd.RDD

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.IndexToString
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.TaskContext
import org.apache.spark.ml.Pipeline
import org.apache.spark.sql.SaveMode

/**
 * 個性化推薦ALS算法
 * 用戶對資源的點擊率作為評分
 *
 */
object Recommend {

  case class Rating(user: Int, movie: Int, rating: Float)
 
  
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("Java Spark MYSQL Recommend")
      .master("local")
      .config("es.nodes", "127.0.0.1")
      .config("es.port", "9200")
      .config("es.mapping.date.rich", "false") //不解析日期類型
      .getOrCreate()

    trainModel(spark)

    spark.close()
  }

  def trainModel(spark: SparkSession): Unit = {
    import spark.implicits._

    val MAX = 3 // 最大推薦數目
    val rank = 10 // 向量大小,默認10
    val iterations = 10 // 迭代次數,默認10


    val url = "jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8"
    val table = "clicks"
    val user = "root"
    val pass = "123456"

    val props = new Properties()
    props.setProperty("user", user) // 設置用戶名
    props.setProperty("password", pass) // 設置密碼

    val clicks = spark.read.jdbc(url, table, props).repartition(4)
    clicks.createOrReplaceGlobalTempView("clicks")

    val agg = spark.sql("SELECT userId ,resId ,COUNT(id)  AS clicks FROM global_temp.clicks GROUP BY userId,resId")
   
    val userIndexer = new StringIndexer()
      .setInputCol("userId")
      .setOutputCol("userIndex")

    val resIndexer = new StringIndexer()
      .setInputCol("resId")
      .setOutputCol("resIndex")

    val indexed1 = userIndexer.fit(agg).transform(agg)
    val indexed2 = resIndexer.fit(indexed1).transform(indexed1)
    indexed2.show()

    val ratings = indexed2.map(x => Rating(x.getDouble(3).toInt, x.getDouble(4).toInt, x.getLong(2).toFloat))
    ratings.show()

    val Array(training, test) = ratings.randomSplit(Array(0.9, 0.1))
    println("training:")
    training.show()
    println("test:")
    test.show()

    //隱性反饋和顯示反饋
    val als = new ALS()
      .setMaxIter(iterations)
      .setRegParam(0.01)
      .setImplicitPrefs(false)
      .setUserCol("user")
      .setItemCol("movie")
      .setRatingCol("rating")

    val model = als.fit(ratings)

    // Evaluate the model by computing the RMSE on the test data
    // Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
    model.setColdStartStrategy("drop")
    val predictions = model.transform(test)

    val r2 = model.recommendForAllUsers(MAX)

    println(r2.schema)

    val result = r2.rdd.flatMap(row => {
      val userId = row.getInt(0)
      val arrayPredict: Seq[Row] = row.getSeq(1)

      var result = ArrayBuffer[Rating]()
      arrayPredict.foreach(rowPredict => {
        val p = rowPredict(0).asInstanceOf[Int]
        val score = rowPredict(1).asInstanceOf[Float]
        val sql = "insert into recommends(userId,resId,score) values (" +
          userId + "," +
          rowPredict(0) + "," +
          rowPredict(1) +
          ")"
        println("sql:" + sql)
        result.append(Rating(userId, p, score))
      })
      for (i <- result) yield {
        i
      }
    })

    println("推薦結果RDD已展開")
    result.toDF().show()

    //資源id隱射
    val resInt2Index = new IndexToString()
      .setInputCol("movie")
      .setOutputCol("resId")
      .setLabels(resIndexer.fit(indexed1).labels)

    //userId映射
    val userInt2Index = new IndexToString()
      .setInputCol("user")
      .setOutputCol("userId")
      .setLabels(userIndexer.fit(agg).labels)

    val rc = userInt2Index.transform(resInt2Index.transform(result.toDF()))
    rc.show()

    rc.withColumnRenamed("rating","score").select("userId", "resId","score").write.mode(SaveMode.Overwrite)
      .format("jdbc")
      .option("url", url)
      .option("dbtable", "recommends")
      .option("user", user)
      .option("password", pass)
      .option("batchsize", "5000")
      .option("truncate", "true")
      .save

      println("finished!!!")
  }

}

DataFrame寫入mysql還有另一種寫法,就是原生寫入:

    //分區寫推薦結果到mysql
        r2.foreachPartition(p => {
          @transient val conn = ConnectionPool.getConnection
          p.foreach(row => {
            val userId = row.getInt(0)
            val arrayPredict: Seq[Row] = row.getSeq(1)
            arrayPredict.foreach(rowPredict => {
              println(rowPredict(0) + "@" + rowPredict(1))
              val sql = "insert into recommends(userId,resId,score) values (" +
                  userId+"," +
                rowPredict(0)+","+
               rowPredict(1) +
                ")"
               println("sql:"+sql)
              val stmt = conn.createStatement
              stmt.executeUpdate(sql)
            })
          })
          ConnectionPool.returnConnection(conn)
        })

感謝各位的閱讀,以上就是“Spark ALS實現的步驟是什么”的內容了,經過本文的學習后,相信大家對Spark ALS實現的步驟是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

柏乡县| 白河县| 铜川市| 唐河县| 军事| 东源县| 盐边县| 霍邱县| 沙雅县| 丰镇市| 株洲县| 秦皇岛市| 灌阳县| 关岭| 塘沽区| 广宗县| 慈溪市| 萍乡市| 铜陵市| 石首市| 延吉市| 龙南县| 土默特右旗| 麻城市| 普陀区| 乐东| 普宁市| 哈尔滨市| 沿河| 通渭县| 平顶山市| 宜宾市| 黄浦区| 南昌市| 汾西县| 珠海市| 磴口县| 长丰县| 江陵县| 介休市| 高要市|