반응형

Spark ML을 이용한 CF 분석 및 추천 예제 실습


오늘은 스파크 기계학습 MLlib를 이용한 (CF활용) 추천 예제를 실습한 내용을 올립니다.


생각보다 간단하고 쉬운(?) 개발/조작으로 빅데이터 분석을 할 수 있음에 놀랐습니다.




아래 내용은 기계학습을 이용한 추천 시스템을 만들기위해 Spark ML을 이용을 위한 예제를 가지고 실습해본 내용입니다.

먼저 데이터가 있어야하고 Spark ML소스가 있어야합니다. 기본적인 기계학습, CF에 대한 이해가 있으면 더욱 쉽게 이해하실 것 같습니다.




아래는 스파크 사이트의 예제 소스를 제플린에 입력/수정한 스칼라 코드

=========================================================================================

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("/tmp/ratings/ratings.csv")
val ratings = data.map(_.split(',') match { case Array(user, item, rate, time) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter_user1")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter_user1")

=========================================================================================





그리고 제플린에서 실행한 결과 로그 입니다.

=========================================================================================

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
data: org.apache.spark.rdd.RDD[String] = /tmp/ratings/ratings.csv MapPartitionsRDD[459] at textFile at <console>:40
ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD[460] at map at <console>:42
rank: Int = 10
numIterations: Int = 10
model: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@1dd01d39
usersProducts: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[668] at map at <console>:44
predictions: org.apache.spark.rdd.RDD[((Int, Int), Double)] = MapPartitionsRDD[678] at map at <console>:53
ratesAndPreds: org.apache.spark.rdd.RDD[((Int, Int), (Double, Double))] = MapPartitionsRDD[682] at join at <console>:56
MSE: Double = 0.5161888311659686
Mean Squared Error = 0.5161888311659686
sameModel: org.apache.spark.mllib.recommendation.MatrixFactorizationModel = org.apache.spark.mllib.recommendation.MatrixFactorizationModel@3334459
Took 2 min 31 sec. Last updated by anonymous at December 29 2016, 5:13:24 PM. (outdated)

=========================================================================================

이후 실행을 통해 생성된 model 객체를 아래 URL에 있는 메소드를 이용하여 데이터를 후처리 할 수 있습니다.

=========================================================================================

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.recommendation.MatrixFactorizationModel

Value Members

  1. defpredict(usersProducts: JavaPairRDD[IntegerInteger])JavaRDD[Rating]

    Java-friendly version of MatrixFactorizationModel.predict.

  2. defpredict(usersProducts: RDD[(IntInt)])RDD[Rating]

    Predict the rating of many users for many products.

  3. defpredict(user: Intproduct: Int)Double

    Predict the rating of one user for one product.

  4. valproductFeaturesRDD[(IntArray[Double])]

    RDD of tuples where each tuple represents the productId and the features computed for this product.

  5. valrankInt

    Rank for the features in this model.

  6. defrecommendProducts(user: Intnum: Int)Array[Rating]

    Recommends products to a user.

  7. defrecommendProductsForUsers(num: Int)RDD[(IntArray[Rating])]

    Recommends top products for all users.

  8. defrecommendUsers(product: Intnum: Int)Array[Rating]

    Recommends users to a product.

  9. defrecommendUsersForProducts(num: Int)RDD[(IntArray[Rating])]

    Recommends top users for all products.

  10. defsave(sc: SparkContextpath: String)Unit

    Save this model to the given path.

  11. valuserFeaturesRDD[(IntArray[Double])]

    RDD of tuples where each tuple represents the userId and the features computed for this user.

=========================================================================================







드디어, 실제로 추천된 내용을 찾아보는 순서입니다.

=========================================================================================

model.recommendUsers(1,2)   ==> 이 명령어를 실행해서 아래의 결과를 얻음. 유서 ID 1번에게 2개의 최적 상품을 추천한다면 내용은....

res9: Array[org.apache.spark.mllib.recommendation.Rating] = Array(Rating(26896,1,8.876352553234511), Rating(102611,1,8.303550899210796))

==> 결론은 Array 뒤에있는 내용, 첫번째로 26896번 상품을 추천 점수는 8.876352553234511, 두번째 상품은 102611번으로 .....




​ 


너무 심플해서 놀랍습니다.

물론 상세한 내용으로 들어가자면 모르는 부분이 많지만 말그대로 빅데이터를 쉽게 분석할 수 있다는 생각이 들었습니다.



반응형

+ Recent posts