Skip to content

Commit

Permalink
MAHOUT-1660 MAHOUT-1713 MAHOUT-1714 MAHOUT-1715 MAHOUT-1716 MAHOUT-17…
Browse files Browse the repository at this point in the history
…17 MAHOUT-1718 MAHOUT-1719 MAHOUT-1720 MAHOUT-1721 MAHOUT-1722 MAHOUT-1723 MAHOUT-1724 MAHOUT-1725 MAHOUT-1726 MAHOUT-1727 MAHOUT-1728 MAHOUT-1729 MAHOUT-1730 MAHOUT-1731 MAHOUT-1732

Cumulative patch for the above issues. Closes apache#135

Squashed commit of the following:

commit c59bf8a
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 8 18:11:57 2015 -0700

    handling degenerate matrix cases for rbind, cbind, and serialization (0 columns or rows)

commit 56b735e
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 8 16:58:34 2015 -0700

    Inserting back the testing framework artifact being built. Need this as a dependency
    in subordinate projects that do method testing as well.

commit 7e6ce76
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 8 10:22:53 2015 -0700

    adding "final" for logger per comment on public PR

commit e42bced
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jun 2 12:24:30 2015 -0700

    final fixes in h20.
    fixing @deprecated warnings in atb

commit 00fb618
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jun 2 12:08:13 2015 -0700

    h20 stuff

commit f4e1550
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Tue Jun 2 11:55:30 2015 -0700

    restoring merge errors in h2o module, nothing is touched here.

commit 1b892de
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 1 18:44:21 2015 -0700

    Picking up missing changes on both sides in spark module.
    TODO: Pat's similarity driver tests fail, seems, on some degenerate splitting in optimizer. Need to take a look.

commit 3422046
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 1 18:13:03 2015 -0700

    Adding missing change. uncommenting performance in-core tests.

commit 7aa5de5
Author: Dmitriy Lyubimov <dlyubimov@apache.org>
Date:   Mon Jun 1 17:54:17 2015 -0700

    Initial merge with ora private-review branch. Stuff compiles up to h2o (which needs to be added some unimplemented stuff) and ssvd tests
    are failing in math-scala module due to lack of matrix flavor on mmul. They are not failing in private branch though -- some changes still
    have not been merged?

    Most changes i care about seems to be there though.
  • Loading branch information
dlyubimov committed Jun 11, 2015
1 parent e6d24b9 commit 8a6b805
Show file tree
Hide file tree
Showing 107 changed files with 4,251 additions and 643 deletions.
41 changes: 41 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,47 @@ Mahout Change Log

Release 0.10.2 - unreleased

MAHOUT-1660: Hadoop1HDFSUtil.readDRMHEader should be taking Hadoop conf (dlyubimov)

MAHOUT-1713: Performance and parallelization improvements for AB', A'B, A'A spark physical operators (dlyubimov)

MAHOUT-1714: Add MAHOUT_OPTS environment when running Spark shell (dlyubimov)

MAHOUT-1715: Closeable API for broadcast tensors (dlyubimov)

MAHOUT-1716: Scala logging style (dlyubimov)

MAHOUT-1717: allreduceBlock() operator api and Spark implementation (dlyubimov)

MAHOUT-1718: Support for conversion of any type-keyed DRM into ordinally-keyed DRM (dlyubimov)

MAHOUT-1719: Unary elementwise function operator and function fusions (dlyubimov)

MAHOUT-1720: Support 1 cbind X, X cbind 1 etc. for both Matrix and DRM (dlyubimov)

MAHOUT-1721: rowSumsMap() summary for non-int-keyed DRMs (dlyubimov)

MAHOUT-1722: DRM row sampling api (dlyubimov)

MAHOUT-1723: Optional structural "flavor" abstraction for in-core matrices (dlyubimov)

MAHOUT-1724: Optimizations of matrix-matrix in-core multiplication based on structural flavors (dlyubimov)

MAHOUT-1725: elementwise power operator ^ (dlyubimov)

MAHOUT-1726: R-like vector concatenation operator (dlyubimov)

MAHOUT-1727: Elementwise analogues of scala.math functions for tensor types (dlyubimov)

MAHOUT-1728: In-core functional assignments (dlyubimov)

MAHOUT-1729: Straighten out behavior of Matrix.iterator() and iterateNonEmpty() (dlyubimov)

MAHOUT-1730: New mutable transposition view for in-core matrices (dlyubimov)

MAHOUT-1731: Deprecate SparseColumnMatrix (dlyubimov)

MAHOUT-1732: Native support for kryo serialization of tensor types (dlyubimov)

Release 0.10.1 - 2015-05-31

Expand Down
4 changes: 1 addition & 3 deletions bin/mahout
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,10 @@ fi
# restore ordinary behaviour
unset IFS



case "$1" in
(spark-shell)
save_stty=$(stty -g 2>/dev/null);
"$JAVA" $JAVA_HEAP_MAX -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@
"$JAVA" $JAVA_HEAP_MAX $MAHOUT_OPTS -classpath "$CLASSPATH" "org.apache.mahout.sparkbindings.shell.Main" $@
stty sane; stty $save_stty
;;
# Spark CLI drivers go here
Expand Down
12 changes: 12 additions & 0 deletions h2o/src/main/java/org/apache/mahout/h2obindings/drm/H2OBCast.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,16 @@ private T deserialize(byte buf[]) {
}
return ret;
}

/**
* Stop broadcasting when called on driver side. Release any network resources.
*
*/
@Override
public void close() throws IOException {

// TODO: review this. It looks like it is not really a broadcast mechanism but rather just a
// serialization wrapper. In which case it doesn't hold any network resources.

}
}
66 changes: 47 additions & 19 deletions h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,13 @@ import org.apache.mahout.math.drm.logical._
import org.apache.mahout.h2obindings.ops._
import org.apache.mahout.h2obindings.drm._
import org.apache.mahout.h2o.common.{Hadoop1HDFSUtil, HDFSUtil}
import org.apache.mahout.logging._

/** H2O specific non-DRM operations */
object H2OEngine extends DistributedEngine {

private final implicit val log = getLog(H2OEngine.getClass)

// By default, use Hadoop 1 utils
var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil

Expand Down Expand Up @@ -119,40 +123,64 @@ object H2OEngine extends DistributedEngine {
abstract class IndexedDatasetH2O(val matrix: CheckpointedDrm[Int], val rowIDs: BiDictionary, val columnIDs: BiDictionary)
extends IndexedDataset {}

/**
* reads an IndexedDatasetH2O from default text delimited files
/**
* Reads an IndexedDatasetH2O from default text delimited files
* @todo unimplemented
* @param src a comma separated list of URIs to read from
* @param schema how the text file is formatted
* @return
*/
def indexedDatasetDFSRead(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit sc: DistributedContext):
IndexedDatasetH2O = {
// should log a warning when this is built but no logger here, can an H2O contributor help with this
println("Warning: unimplemented indexedDatasetDFSReadElements." )
throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read.")
null.asInstanceOf[IndexedDatasetH2O]
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit sc: DistributedContext):
IndexedDatasetH2O = {

error("Unimplemented indexedDatasetDFSReadElements.")

???
}

/**
* reads an IndexedDatasetH2O from default text delimited files
* Reads an IndexedDatasetH2O from default text delimited files
* @todo unimplemented
* @param src a comma separated list of URIs to read from
* @param schema how the text file is formatted
* @return
*/
def indexedDatasetDFSReadElements(src: String,
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit sc: DistributedContext):
IndexedDatasetH2O = {
// should log a warning when this is built but no logger here, can an H2O contributor help with this
println("Warning: unimplemented indexedDatasetDFSReadElements." )
throw new UnsupportedOperationException("IndexedDatasetH2O is not implemented so can't be read by elements.")
null.asInstanceOf[IndexedDatasetH2O]
schema: Schema = DefaultIndexedDatasetReadSchema,
existingRowIDs: Option[BiDictionary] = None)
(implicit sc: DistributedContext): IndexedDatasetH2O = {

error("Unimplemented indexedDatasetDFSReadElements.")

???
}

/**
* Optional engine-specific all reduce tensor operation.
*
* TODO: implement this please.
*
*/
override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc)
: Matrix = ???

/**
* TODO: implement this please.
*/
override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = ???

/**
* (Optional) Sampling operation. Consistent with Spark semantics of the same.
* TODO: implement this please.
*/
override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = ???

/**
* TODO: implement this please.
*/
override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean)
: (DrmLike[Int], Option[DrmLike[K]]) = ???
}
73 changes: 73 additions & 0 deletions math-scala/src/main/scala/org/apache/mahout/logging/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.mahout

import org.apache.log4j.{Level, Priority, Logger}

package object logging {

/** Compute `expr` if debug is on, only */
def debugDo[T](expr: => T)(implicit log: Logger): Option[T] = {
if (log.isDebugEnabled) Some(expr)
else None
}

/** Compute `expr` if trace is on, only */
def traceDo[T](expr: => T)(implicit log: Logger): Option[T] = {
if (log.isTraceEnabled) Some(expr) else None
}

/** Shorter, and lazy, versions of logging methods. Just declare log implicit. */
def debug(msg: => AnyRef)(implicit log: Logger) { if (log.isDebugEnabled) log.debug(msg) }

def debug(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isDebugEnabled()) log.debug(msg, t) }

/** Shorter, and lazy, versions of logging methods. Just declare log implicit. */
def trace(msg: => AnyRef)(implicit log: Logger) { if (log.isTraceEnabled) log.trace(msg) }

def trace(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isTraceEnabled()) log.trace(msg, t) }

def info(msg: => AnyRef)(implicit log: Logger) { if (log.isInfoEnabled) log.info(msg)}

def info(msg: => AnyRef, t:Throwable)(implicit log: Logger) { if (log.isInfoEnabled) log.info(msg,t)}

def warn(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.WARN)) log.warn(msg) }

def warn(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.WARN)) error(msg, t) }

def error(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.ERROR)) log.warn(msg) }

def error(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.ERROR)) error(msg, t) }

def fatal(msg: => AnyRef)(implicit log: Logger) { if (log.isEnabledFor(Level.FATAL)) log.fatal(msg) }

def fatal(msg: => AnyRef, t: Throwable)(implicit log: Logger) { if (log.isEnabledFor(Level.FATAL)) log.fatal(msg, t) }

def getLog(name: String): Logger = Logger.getLogger(name)

def getLog(clazz: Class[_]): Logger = Logger.getLogger(clazz)

def mahoutLog :Logger = getLog("org.apache.mahout")

def setLogLevel(l:Level)(implicit log:Logger) = {
log.setLevel(l)
}

def setAdditivity(a:Boolean)(implicit log:Logger) = log.setAdditivity(a)

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.mahout.math.decompositions

import scala.reflect.ClassTag
import org.apache.mahout.logging._
import org.apache.mahout.math.Matrix
import org.apache.mahout.math.scalabindings._
import RLikeOps._
Expand All @@ -27,7 +28,7 @@ import org.apache.log4j.Logger

object DQR {

private val log = Logger.getLogger(DQR.getClass)
private final implicit val log = getLog(DQR.getClass)

/**
* Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
Expand All @@ -41,19 +42,19 @@ object DQR {
def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {

if (drmA.ncol > 5000)
log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
warn("A is too fat. A'A must fit in memory and easily broadcasted.")

implicit val ctx = drmA.context

val AtA = (drmA.t %*% drmA).checkpoint()
val inCoreAtA = AtA.collect

if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
trace("A'A=\n%s\n".format(inCoreAtA))

val ch = chol(inCoreAtA)
val inCoreR = (ch.getL cloned) t

if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
trace("R=\n%s\n".format(inCoreR))

if (checkRankDeficiency && !ch.isPositiveDefinite)
throw new IllegalArgumentException("R is rank-deficient.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import RLikeOps._
import org.apache.mahout.math.drm._
import RLikeDrmOps._
import org.apache.mahout.common.RandomUtils
import org.apache.mahout.logging._

object DSSVD {

private final implicit val log = getLog(DSSVD.getClass)

/**
* Distributed Stochastic Singular Value decomposition algorithm.
*
Expand Down Expand Up @@ -43,32 +46,40 @@ object DSSVD {
case (keys, blockA) =>
val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
keys -> blockY
}
}.checkpoint()

var drmQ = dqrThin(drmY.checkpoint())._1
var drmQ = dqrThin(drmY)._1
// Checkpoint Q if last iteration
if (q == 0) drmQ = drmQ.checkpoint()

trace(s"dssvd:drmQ=${drmQ.collect}.")

// This actually should be optimized as identically partitioned map-side A'B since A and Q should
// still be identically partitioned.
var drmBt = drmAcp.t %*% drmQ
// Checkpoint B' if last iteration
if (q == 0) drmBt = drmBt.checkpoint()

trace(s"dssvd:drmB'=${drmBt.collect}.")

for (i <- 0 until q) {
drmY = drmAcp %*% drmBt
drmQ = dqrThin(drmY.checkpoint())._1
// Checkpoint Q if last iteration
if (i == q - 1) drmQ = drmQ.checkpoint()

// This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
// identically partitioned anymore.
// identically partitioned anymore.`
drmBt = drmAcp.t %*% drmQ
// Checkpoint B' if last iteration
if (i == q - 1) drmBt = drmBt.checkpoint()
}

val (inCoreUHat, d) = eigen(drmBt.t %*% drmBt)
val mxBBt:Matrix = drmBt.t %*% drmBt

trace(s"dssvd: BB'=$mxBBt.")

val (inCoreUHat, d) = eigen(mxBBt)
val s = d.sqrt

// Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private[math] object SSVD {
val c = s_q cross s_b

// BB' computation becomes
val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
val bbt = bt.t %*% bt - c - c.t + (s_q cross s_q) * (xi dot xi)

val (uhat, d) = eigen(bbt)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.mahout.math.drm

/** Broadcast variable abstraction */
trait BCast[T] {
trait BCast[T] extends java.io.Closeable {
def value:T

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.mahout.math.drm
import scala.reflect.ClassTag
import org.apache.mahout.math._

import org.apache.mahout.math.scalabindings.RLikeOps._

/**
* Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
Expand All @@ -38,6 +39,12 @@ class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
/** Column Means */
def colMeans(): Vector = drm.context.colMeans(drm)

/** Optional engine-specific all reduce tensor operation. */
def allreduceBlock(bmf: BlockMapFunc2[K], rf: BlockReduceFunc = _ += _): Matrix =

drm.context.allreduceBlock(drm, bmf, rf)


def norm():Double = drm.context.norm(drm)
}

Loading

0 comments on commit 8a6b805

Please sign in to comment.