# Fetch sample data cd wget http://files.grouplens.org/datasets/movielens/ml-latest-small.zip unzip ml-latest-small.zip # Configure environment export SPARK_HOME=/home/ubuntu/spark-1.6.1-bin-hadoop2.6/ cd $SPARK_HOME # Start up pyspark and run python commands bin/pyspark # Read the data into memory text = sc.textFile("/home/ubuntu/ml-latest-small/ratings.csv") header = text.take(1)[0] data = text.filter(lambda line: line != header).map(lambda line: line.split(',')).map(lambda value: (value[0], value[1], value[2])).cache() data.take(5) trn_X_y, val_X_y, tst_X_y = data.randomSplit([ 0.6, 0.2, 0.2 ], seed=17L) val_X = val_X_y.map(lambda key: (key[0], key[1])) tst_X = tst_X_y.map(lambda key: (key[0], key[1])) # Select the model from pyspark.mllib.recommendation import ALS import math bestRMSE = float("inf") bestRank = -1 for rank in [ 4, 8, 12 ]: model = ALS.train(trn_X_y, rank, iterations = 10, lambda_ = 0.1, seed=17L) predictions = model.predictAll(val_X).map(lambda value: ((value[0], value[1]), value[2])) ratings_and_predictions = val_X_y.map(lambda value: ((int(value[0]), int(value[1])), float(value[2]))).join(predictions) rmse = math.sqrt(ratings_and_predictions.map(lambda value: (value[1][0] - value[1][1])**2).mean()) print("rank " + str(rank) + ": rmse = " + str(rmse)) if (rmse < bestRMSE): bestRMSE = rmse bestRank = rank print("best rank " + str(bestRank) + ": best rmse = " + str(bestRMSE)) # Construct the model and assess its performance using root mean square error fin_trn_X_y = trn_X_y.union(val_X_y) final = ALS.train(fin_trn_X_y, bestRank, lambda_ = 0.1, seed=17L) predictions = final.predictAll(tst_X).map(lambda value: ((value[0], value[1]), value[2])) ratings_and_predictions = tst_X_y.map(lambda value: ((int(value[0]), int(value[1])), float(value[2]))).join(predictions) rmse = math.sqrt(ratings_and_predictions.map(lambda value: (value[1][0] - value[1][1])**2).mean()) print("rmse for the trained model: " + str(rmse))