text = sc.textFile("trn_X_y.csv") header = text.take(1)[0] trnData = text.filter(lambda line: line != header).map(lambda line: line.split(',')).map(lambda value: (value[0], value[1], value[2])).cache() trnData.take(5) trn_X_y, val_X_y = trnData.randomSplit([ 0.8, 0.2 ], seed=17L) val_X = val_X_y.map(lambda key: (key[0], key[1])) from pyspark.mllib.recommendation import ALS import math bestRMSE = float("inf") bestRank = -1 for rank in [ 4, 8, 16 ]: model = ALS.train(trn_X_y, rank, iterations = 10, lambda_ = 0.1, seed=17L) predictions = model.predictAll(val_X).map(lambda value: ((value[0], value[1]), value[2])) ratings_and_predictions = val_X_y.map(lambda value: ((int(value[0]), int(value[1])), float(value[2]))).join(predictions) rmse = math.sqrt(ratings_and_predictions.map(lambda value: (value[1][0] - value[1][1])**2).mean()) print("rank " + str(rank) + ": rmse = " + str(rmse)) if (rmse < bestRMSE): bestRMSE = rmse bestRank = rank meanRating = trnData.map(lambda value: (float(value[2]))).mean() model = ALS.train(trnData, bestRank, lambda_ = 0.1, seed=17L) tstText = sc.textFile("tst_X.csv") tstHeader = tstText.take(1)[0] tstData = tstText.filter(lambda line: line != tstHeader).map(lambda line: line.split(',')).map(lambda value: (value[0], value[1], value[2])).cache() predictions = model.predictAll(tstData.map(lambda value: (value[1], value[2]))).map(lambda value: ((value[0], value[1]), value[2])) predictionsAndIndex = tstData.map(lambda value: ((int(value[1]), int(value[2])), int(value[0]))).join(predictions) index2rating = dict(predictionsAndIndex.map(lambda value: (value[1][0], value[1][1])).take(predictionsAndIndex.count())) output = open("predictions.csv", "w") output.write("index,prediction\n") for tuple in tstData.take(tstData.count()): index = int(tuple[0]) prediction = meanRating if (index in index2rating): prediction = index2rating[index] output.write(str(index) + "," + str(prediction) + "\n") output.close()