From e2c15515238e9968f085554802221c12e001ce5e Mon Sep 17 00:00:00 2001 From: Michael Tu Date: Fri, 15 Dec 2017 19:15:11 +0000 Subject: [PATCH] reindent --- src/main/scala/largelsh/PairwiseNaive.scala | 1 + src/main/scala/largelsh/SparkLSHv2.scala | 19 +++++++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/main/scala/largelsh/PairwiseNaive.scala b/src/main/scala/largelsh/PairwiseNaive.scala index 8789f26..9da09d0 100644 --- a/src/main/scala/largelsh/PairwiseNaive.scala +++ b/src/main/scala/largelsh/PairwiseNaive.scala @@ -70,6 +70,7 @@ object PairwiseNaive { val spark = SparkSession .builder() .appName("Naive All Pairs Implementation") + .config("spark.driver.maxResultSize", 0) .getOrCreate() import spark.implicits._ diff --git a/src/main/scala/largelsh/SparkLSHv2.scala b/src/main/scala/largelsh/SparkLSHv2.scala index b98cd80..b906f8c 100644 --- a/src/main/scala/largelsh/SparkLSHv2.scala +++ b/src/main/scala/largelsh/SparkLSHv2.scala @@ -86,23 +86,22 @@ object SparkLSHv2 { model.approxSimilarityJoin(transformedA, transformedB, threshold, "EuclideanDistance") val predictionPoints = transformedB.select("label", "features") - .rdd - .zipWithIndex + .rdd + .zipWithIndex val seqop = (s: (Double, Double), t: (Double, Double)) => if (t._1 == t._2) (s._1 + 1, s._2 + 1) else (s._1, s._2 + 1) val combop = (s1: (Double, Double), s2: (Double, Double)) => (s1._1 + s2._1, s1._2 + s2._2) val groups = testingCount / 1000 val overallAccAndCount = (0L until groups).toList.par.map(mod => { - val predictionsSubset = predictionPoints.filter { case (row, idx) => idx % groups == mod} - .collect.par + val predictionsSubset = predictionPoints.filter { case (row, idx) => idx % groups == mod }.collect.par val accAndCount = predictionsSubset.map { case (row, idx) => { - val key = row.getAs[org.apache.spark.ml.linalg.SparseVector](1) - val ann = model.approxNearestNeighbors(transformedA, key, k) - val prediction = ann.select("label").groupBy("label").count.sort(desc("label")).first.getDouble(0) - (row.getDouble(0), prediction) // label, prediction - }}.aggregate((0.0, 0.0))(seqop, combop) + val key = row.getAs[org.apache.spark.ml.linalg.SparseVector](1) + val ann = model.approxNearestNeighbors(transformedA, key, k) + val prediction = ann.select("label").groupBy("label").count.sort(desc("label")).first.getDouble(0) + (row.getDouble(0), prediction) // label, prediction + }}.aggregate((0.0, 0.0))(seqop, combop) - accAndCount + accAndCount }).aggregate((0.0, 0.0))(combop, combop) val accuracy = overallAccAndCount._1 / overallAccAndCount._2 println("bl:", bl, "nht:", nht, "k:", k, "accuracy:", accuracy)