登录 |  注册 |  繁體中文


基于Spark的电影推荐系统

分类: Spark 颜色:橙色 默认  字号: 阅读(1958) | 评论(0)

基于Mahout的电影推荐系统 详见  http://www.php3.cn/a/178.html

下载示例数据

本工程所用到的数据来源于此处 http://grouplens.org/datasets/movielens/

有100k到10m的数据都有。我们这里选择100k的数据。

对下载的数据解压之后,会出现很多文件,我们需要使用u.data和u.user文件。详细的数据说明可以参见README。

u.data是用户对电影评分的数据,也是训练集。数据分别表示userId,moiveId,评分rate,时间戳。如下图所示
 
u.user是用户的个人信息数据,用以推荐使用,分别表示userId,age,sex,job,zip code。我们只使用userId即可。如下图所示

上传这两个文件到hdfs, 这里有10w条用户对电影的评分,从1-5分,1分表示差劲,5分表示非常好看。根据用户对电影的喜好,给用户推荐可能感兴趣的电影。

实现思路

代码实现如下:
1、加载u.data数据到rating RDD中
2、对rating RDD的数据进行分解,只需要userId,moiveId,rating
3、使用rating RDD训练ALS模型
4、使用ALS模型为u.user中的用户进行电影推荐,数据保存到HBase中  
5、评估模型的均方差

实现代码:

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.{ PairRDDFunctions, RDD }
import org.apache.spark.SparkContext
import scala.collection.mutable.HashMap
import java.util.List
import java.util.ArrayList
import scopt.OptionParser

import com.ml.util.HbaseUtil

/**
 * moivelens 电影推荐
 *
 */
object MoiveRecommender {

  val numRecommender = 10

  case class Params(
    input: String = null,
    numIterations: Int = 20,
    lambda: Double = 1.0,
    rank: Int = 10,
    numUserBlocks: Int = -1,
    numProductBlocks: Int = -1,
    implicitPrefs: Boolean = false,
    userDataInput: String = null)

  def main(args: Array[String]) {

    val defaultParams = Params()

    val parser = new OptionParser[Params]("MoiveRecommender") {
      head("MoiveRecommender: an example app for ALS on MovieLens data.")
      opt[Int]("rank")
        .text(s"rank, default: ${defaultParams.rank}}")
        .action((x, c) => c.copy(rank = x))
      opt[Int]("numIterations")
        .text(s"number of iterations, default: ${defaultParams.numIterations}")
        .action((x, c) => c.copy(numIterations = x))
      opt[Double]("lambda")
        .text(s"lambda (smoothing constant), default: ${defaultParams.lambda}")
        .action((x, c) => c.copy(lambda = x))
      opt[Int]("numUserBlocks")
        .text(s"number of user blocks, default: ${defaultParams.numUserBlocks} (auto)")
        .action((x, c) => c.copy(numUserBlocks = x))
      opt[Int]("numProductBlocks")
        .text(s"number of product blocks, default: ${defaultParams.numProductBlocks} (auto)")
        .action((x, c) => c.copy(numProductBlocks = x))
      opt[Unit]("implicitPrefs")
        .text("use implicit preference")
        .action((_, c) => c.copy(implicitPrefs = true))
      opt[String]("userDataInput")
        .required()
        .text("use data input path")
        .action((x, c) => c.copy(userDataInput = x))
      arg[String]("<input>")
        .required()
        .text("input paths to a MovieLens dataset of ratings")
        .action((x, c) => c.copy(input = x))
      note(
        """
          |For example, the following command runs this app on a synthetic dataset:
          |
          | bin/spark-submit --class com.zachary.ml.MoiveRecommender 
          |  examples/target/scala-*/spark-examples-*.jar 
          |  --rank 5 --numIterations 20 --lambda 1.0 
          |  data/mllib/u.data
        """.stripMargin)
    }

    parser.parse(args, defaultParams).map { params =>
      run(params)
    } getOrElse {
      System.exit(1)
    }

  }

  def run(params: Params) {

    //本地运行模式,读取本地的spark主目录
    var conf = new SparkConf().setAppName("Moive Recommendation")
      .setSparkHome("D:workhadoop_libspark-1.1.0-bin-hadoop2.4spark-1.1.0-bin-hadoop2.4")
    conf.setMaster("local[*]")

    //集群运行模式,读取spark集群的环境变量
    var conf = new SparkConf().setAppName("Moive Recommendation")
    val context = new SparkContext(conf)

    //加载数据
    val data = context.textFile(params.input)

    /**
     * *MovieLens ratings are on a scale of 1-5:
     * 5: Must see
     * 4: Will enjoy
     * 3: Its okay
     * 2: Fairly bad
     * 1: Awful
     */
    val ratings = data.map(_.split("	") match {
      case Array(user, item, rate, time) => Rating(user.toInt, item.toInt, rate.toDouble)
    })

    //使用ALS建立推荐模型    
    //也可以使用简单模式    val model = ALS.train(ratings, ranking, numIterations)
    val model = new ALS()
      .setRank(params.rank)
      .setIterations(params.numIterations)
      .setLambda(params.lambda)
      .setImplicitPrefs(params.implicitPrefs)
      .setUserBlocks(params.numUserBlocks)
      .setProductBlocks(params.numProductBlocks)
      .run(ratings)

    predictMoive(params, context, model)

    evaluateMode(ratings, model)

    //clean up
    context.stop()

  }

  /**
   * 模型评估
   */
  private def evaluateMode(ratings: RDD[Rating], model: MatrixFactorizationModel) {

    //使用训练数据训练模型
    val usersProducets = ratings.map(r => r match {
      case Rating(user, product, rate) => (user, product)
    })

    //预测数据
    val predictions = model.predict(usersProducets).map(u => u match {
      case Rating(user, product, rate) => ((user, product), rate)
    })

    //将真实分数与预测分数进行合并
    val ratesAndPreds = ratings.map(r => r match {
      case Rating(user, product, rate) =>
        ((user, product), rate)
    }).join(predictions)

    //计算均方差
    val MSE = ratesAndPreds.map(r => r match {
      case ((user, product), (r1, r2)) =>
        var err = (r1 - r2)
        err * err
    }).mean()

    //打印出均方差值
    println("Mean Squared Error = " + MSE)
  }

  /**
   * 预测数据并保存到HBase中
   */
  private def predictMoive(params: Params, context: SparkContext, model: MatrixFactorizationModel) {

    var recommenders = new ArrayList[java.util.Map[String, String]]();

    //读取需要进行电影推荐的用户数据
    val userData = context.textFile(params.userDataInput)

    userData.map(_.split("|") match {
      case Array(id, age, sex, job, x) => (id)
    }).collect().foreach(id => {
      //为用户推荐电影 
      var rs = model.recommendProducts(id.toInt, numRecommender)
      var value = ""
      var key = 0

      //保存推荐数据到hbase中
      rs.foreach(r => {
        key = r.user
        value = value + r.product + ":" + r.rating + ","
      })

      //成功,则封装put对象,等待插入到Hbase中
      if (!value.equals("")) {
        var put = new java.util.HashMap[String, String]()
        put.put("rowKey", key.toString)
        put.put("t:info", value)
        recommenders.add(put)
      }
    })

    //保存到到HBase的[recommender]表中
    //recommenders是返回的java的ArrayList,可以自己用Java或者Scala写HBase的操作工具类,这里我就不给出具体的代码了,应该可以很快的写出
    HbaseUtil.saveListMap("recommender", recommenders)
  }
}

3、运行

3.1 目录结构

build.sbt
src/main/scala/MovieRecommender.scala 
project/assembly.sbt

3.2 sbt打包 (sbt安装详见 http://www.php3.cn/a/172.html)

sbt assembly

3.3执行

./bin/spark-submit --class MovieRecommender  --master  spark://hd1:7077 /test/movieRecommend-assembly-1.0.jar  hdfs://hd1:9000/test/u.data --userDataInput   hdfs://hd1:9000/test/u.user

4、结果分析

HBase中推荐数据如下所示
比如 939 用户的推荐电影(格式 moivedID:rating)
516:7.574462241760971,1056:6.979575106203245,1278:6.918614235693566,1268:6.914693317049802,1169:6.881813878580957,1316:6.681612000425281,564:6.622223206958775,909:6.597412586878512,51:6.539969654136097,1385:6.503960660826889

总共有6040个用户,3706个电影(已经去重),1000209条评分数据;如程序,我们把所有数据分为三部分:60%用于训练、20%用户校验、 20%用户测试模型;接下来是模型在不同参数下的均方根误差(RMSE)值,以及对应的参数,最优的参数选择均方根误差(RMSE--- 0.8665911...)最小的参数值---即最优参数模型建立;接着,使用20%的测试模型数据来测试模型的好坏,也就是均方根误差(RMSE),这 里计算的结果为0.86493444...,在最优参数模型基础上提升了22.32%的准确率。

说明下,其实在数据的划分上(60%+20%+20%),最好随机划分数据,这样得到的结果更有说服力。

优化

1、可以调整这些参数,不断优化结果,使均方差变小。比如iterations越多,lambda较小,均方差会较小,推荐结果较优
 
numBlocks 是用于并行化计算的分块个数 (设置为-1为自动配置)。
rank 是模型中隐语义因子的个数。
iterations 是迭代的次数。
lambda 是ALS的正则化参数。
implicitPrefs 决定了是用显性反馈ALS的版本还是用适用隐性反馈数据集的版本。
alpha 是一个针对于隐性反馈 ALS 版本的参数,这个参数决定了偏好行为强度的基准。

 




姓 名: *
邮 箱:
内 容: *
验证码: 点击刷新 *   

回到顶部