Extraction: raw data 추출
Transformation: feature를 다른 featur로 변환
Selection: a larger feature set에서 subset 선택
Locality Sensitive Hashing (LSH): feature 변환의 측면을 다른 알고리즘과 결합
Pipelines
ML Pipelines을 쉽게 만들기 위한 고수준의 API 제공 (DataFrame)
Persistence
saving and load algorithms, models, and Pipelines
Utilities
linear algebra(선형 대수), statistics(통계), data handling(데이터 처리), etc.
라이브러리 추가
//sbt Dependency 추가
libraryDependencies++=Seq("org.apache.spark"%"spark-mllib_2.11"%"2.1.0",)
Collaborate Filtering (ALS) - DataFrame
numBlocks
평가에 대해 병렬화 하기 위해 user-item의 분할될 블록 수 (기본값 10).
rank
모델의 잠재 요인 갯수. 즉 user-feature 행렬과 product-feature 행렬에서 열의 갯수 k (기본값 10)
maxIter
행렬 분해를 반복하는 횟수 (기본값 to 10)
regParam
ALS의 정규화 매개 변수 (기본값 to 1.0)
implicitPrefs
명시 적 피드백 ALS 변형을 사용할지 또는 암시 적 피드백 데이터에 적용 할지를 지정
(기본값은 명시 적 피드백 사용 (파라미터 값: false))
alpha
선호도 관측치에 대한 기본 신뢰도 (기본값 1.0)
ALS의 암시 적 피드백 변형에 적용 가능한 매개변수
nonnegative
최소 제곱에 대해 음수가 아닌 제약 조건을 사용할지 여부를 지정 (기본값 false).
Collaborate Filtering (DataFrame) 예제
importorg.apache.spark.ml.evaluation.RegressionEvaluatorimportorg.apache.spark.ml.recommendation.ALSimportorg.apache.spark.sql.SparkSessionimportspark.implicits._// Using DataFrame
caseclassRating(userId:Int,movieId:Int,rating:Float,timestamp:Long)defparseRating(str:String):Rating={/*
데이터 포맷 확인
head -n 2 ${SPARK_HOME}/data/mllib/als/sample_movielens_ratings.txt
0::2::3::1424380312
0::3::1::1424380312
*/valfields=str.split("::")Rating(fields(0).toInt,fields(1).toInt,fields(2).toFloat,fields(3).toLong)}valratings=spark.read.textFile("${SPARK_HOME}/data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()valArray(training,test)=ratings.randomSplit(Array(0.8,0.2))// ALS알고리즘 사용
valals=newALS().setMaxIter(5)// 다른 source로 부터 rating matrix가 파생된 경우
// setImplicitPrefs(true)을 통해 더 나은 결과를 얻을 수 있다.
// .setImplicitPrefs(true)
.setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")valmodel=als.fit(training)// test data에서 RMSE(평균 제곱근 편차)을 계산하여 모델을 평가한다.
valpredictions=model.transform(test)valevaluator=newRegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")valrmse=evaluator.evaluate(predictions)println(s"Root-mean-square error = $rmse")
Collaborate Filtering (RDD) 예제
importorg.apache.spark.mllib.recommendation.ALSimportorg.apache.spark.mllib.recommendation.Ratingvaldata=sc.textFile("${SPARK_HOME}/data/mllib/als/test.data")valratings=data.map(_.split(',')match{caseArray(user,item,rate)=>Rating(user.toInt,item.toInt,rate.toDouble)})valrank=10valnumIterations=10valmodel=ALS.train(ratings,rank,numIterations,0.01)// rating data을 통한 모델 평가
valusersProducts=ratings.map{caseRating(user,product,_)=>(user,product)}valpredictions=model.predict(usersProducts).map{caseRating(user,product,rate)=>((user,product),rate)}valratesAndPreds=ratings.map{caseRating(user,product,rate)=>((user,product),rate)}.join(predictions)valMSE=ratesAndPreds.map{case((user,product),(r1,r2))=>valerr=(r1-r2)err*err}.mean()println("Mean Squared Error = "+MSE)// Save and load model
// model.save(sc, "/tmp/myCollaborativeFilter")
//valsameModel=MatrixFactorizationModel.load(sc,"/tmp/myCollaborativeFilter")
실행 결과 - DataFrame
평균 제곱근 편차 오차율 1.7277440550109346
실행 결과 - RDD
평균 제곱근 편차 오차 = 5.193340946374788E-6
회고
CF를 RDD로 돌리는것 보다 DataFrame을 사용하는게 평균적으로 평균 제곱근 편차 오차율이 적었다.
DataFrame이 좀 더 나은 결과를 주는것 같다.
Spark2.x부터 RDD보단 DataFrame를 사용하는 쪽으로 가이드를 주고 있으며 참고로 MLlib 3.0부터는 RDD API는 제거 된다고 한다.