Skip to content

Commit

Permalink
[SPARK-7186] [SQL] Decouple internal Row from external Row
Browse files Browse the repository at this point in the history
Currently, we use o.a.s.sql.Row both internally and externally. The external interface is wider than what the internal needs because it is designed to facilitate end-user programming. This design has proven to be very error prone and cumbersome for internal Row implementations.

As a first step, we create an InternalRow interface in the catalyst module, which is identical to the current Row interface. And we switch all internal operators/expressions to use this InternalRow instead. When we need to expose Row, we convert the InternalRow implementation into Row for users.

For all public API, we use Row (for example, data source APIs), which will be converted into/from InternalRow by CatalystTypeConverters.

For all internal data sources (Json, Parquet, JDBC, Hive), we use InternalRow for better performance, casted into Row in buildScan() (without change the public API). When create a PhysicalRDD, we cast them back to InternalRow.

cc rxin marmbrus JoshRosen

Author: Davies Liu <davies@databricks.com>

Closes apache#6792 from davies/internal_row and squashes the following commits:

f2abd13 [Davies Liu] fix scalastyle
a7e025c [Davies Liu] move InternalRow into catalyst
30db8ba [Davies Liu] Merge branch 'master' of github.com:apache/spark into internal_row
7cbced8 [Davies Liu] separate Row and InternalRow
  • Loading branch information
Davies Liu authored and rxin committed Jun 13, 2015
1 parent 6e9c3ff commit d46f8e5
Show file tree
Hide file tree
Showing 132 changed files with 1,160 additions and 973 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.types.StructType;

public abstract class BaseRow implements Row {
public abstract class BaseRow extends InternalRow {

@Override
final public int length() {
Expand Down Expand Up @@ -176,7 +177,7 @@ public boolean equals(Object other) {
}

@Override
public Row copy() {
public InternalRow copy() {
final int n = size();
Object[] arr = new Object[n];
for (int i = 0; i < n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.PlatformDependent;
Expand Down Expand Up @@ -107,7 +107,7 @@ public static boolean supportsAggregationBufferSchema(StructType schema) {
* @param enablePerfMetrics if true, performance metrics will be recorded (has minor perf impact)
*/
public UnsafeFixedWidthAggregationMap(
Row emptyAggregationBuffer,
InternalRow emptyAggregationBuffer,
StructType aggregationBufferSchema,
StructType groupingKeySchema,
TaskMemoryManager memoryManager,
Expand All @@ -125,7 +125,7 @@ public UnsafeFixedWidthAggregationMap(
/**
* Convert a Java object row into an UnsafeRow, allocating it into a new long array.
*/
private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
private static long[] convertToUnsafeRow(InternalRow javaRow, StructType schema) {
final UnsafeRowConverter converter = new UnsafeRowConverter(schema);
final long[] unsafeRow = new long[converter.getSizeRequirement(javaRow)];
final long writtenLength =
Expand All @@ -138,7 +138,7 @@ private static long[] convertToUnsafeRow(Row javaRow, StructType schema) {
* Return the aggregation buffer for the current group. For efficiency, all calls to this method
* return the same object.
*/
public UnsafeRow getAggregationBuffer(Row groupingKey) {
public UnsafeRow getAggregationBuffer(InternalRow groupingKey) {
final int groupingKeySize = groupingKeyToUnsafeRowConverter.getSizeRequirement(groupingKey);
// Make sure that the buffer is large enough to hold the key. If it's not, grow it:
if (groupingKeySize > groupingKeyConversionScratchSpace.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import scala.collection.Seq;
import scala.collection.mutable.ArraySeq;

import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.BaseMutableRow;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructType;
Expand Down Expand Up @@ -334,7 +334,7 @@ public String getString(int i) {


@Override
public Row copy() {
public InternalRow copy() {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -105,7 +106,7 @@ object CatalystTypeConverters {
/**
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
*/
final def toScala(row: Row, column: Int): ScalaOutputType = {
final def toScala(row: InternalRow, column: Int): ScalaOutputType = {
if (row.isNullAt(column)) null.asInstanceOf[ScalaOutputType] else toScalaImpl(row, column)
}

Expand All @@ -125,20 +126,20 @@ object CatalystTypeConverters {
* Given a Catalyst row, convert the value at column `column` to its Scala equivalent.
* This method will only be called on non-null columns.
*/
protected def toScalaImpl(row: Row, column: Int): ScalaOutputType
protected def toScalaImpl(row: InternalRow, column: Int): ScalaOutputType
}

private object IdentityConverter extends CatalystTypeConverter[Any, Any, Any] {
override def toCatalystImpl(scalaValue: Any): Any = scalaValue
override def toScala(catalystValue: Any): Any = catalystValue
override def toScalaImpl(row: Row, column: Int): Any = row(column)
override def toScalaImpl(row: InternalRow, column: Int): Any = row(column)
}

private case class UDTConverter(
udt: UserDefinedType[_]) extends CatalystTypeConverter[Any, Any, Any] {
override def toCatalystImpl(scalaValue: Any): Any = udt.serialize(scalaValue)
override def toScala(catalystValue: Any): Any = udt.deserialize(catalystValue)
override def toScalaImpl(row: Row, column: Int): Any = toScala(row(column))
override def toScalaImpl(row: InternalRow, column: Int): Any = toScala(row(column))
}

/** Converter for arrays, sequences, and Java iterables. */
Expand Down Expand Up @@ -170,7 +171,7 @@ object CatalystTypeConverters {
}
}

override def toScalaImpl(row: Row, column: Int): Seq[Any] =
override def toScalaImpl(row: InternalRow, column: Int): Seq[Any] =
toScala(row(column).asInstanceOf[Seq[Any]])
}

Expand Down Expand Up @@ -209,16 +210,16 @@ object CatalystTypeConverters {
}
}

override def toScalaImpl(row: Row, column: Int): Map[Any, Any] =
override def toScalaImpl(row: InternalRow, column: Int): Map[Any, Any] =
toScala(row(column).asInstanceOf[Map[Any, Any]])
}

private case class StructConverter(
structType: StructType) extends CatalystTypeConverter[Any, Row, Row] {
structType: StructType) extends CatalystTypeConverter[Any, Row, InternalRow] {

private[this] val converters = structType.fields.map { f => getConverterForType(f.dataType) }

override def toCatalystImpl(scalaValue: Any): Row = scalaValue match {
override def toCatalystImpl(scalaValue: Any): InternalRow = scalaValue match {
case row: Row =>
val ar = new Array[Any](row.size)
var idx = 0
Expand All @@ -239,7 +240,7 @@ object CatalystTypeConverters {
new GenericRowWithSchema(ar, structType)
}

override def toScala(row: Row): Row = {
override def toScala(row: InternalRow): Row = {
if (row == null) {
null
} else {
Expand All @@ -253,7 +254,8 @@ object CatalystTypeConverters {
}
}

override def toScalaImpl(row: Row, column: Int): Row = toScala(row(column).asInstanceOf[Row])
override def toScalaImpl(row: InternalRow, column: Int): Row =
toScala(row(column).asInstanceOf[InternalRow])
}

private object StringConverter extends CatalystTypeConverter[Any, String, Any] {
Expand All @@ -266,14 +268,14 @@ object CatalystTypeConverters {
case str: String => str
case utf8: UTF8String => utf8.toString()
}
override def toScalaImpl(row: Row, column: Int): String = row(column).toString
override def toScalaImpl(row: InternalRow, column: Int): String = row(column).toString
}

private object DateConverter extends CatalystTypeConverter[Date, Date, Any] {
override def toCatalystImpl(scalaValue: Date): Int = DateUtils.fromJavaDate(scalaValue)
override def toScala(catalystValue: Any): Date =
if (catalystValue == null) null else DateUtils.toJavaDate(catalystValue.asInstanceOf[Int])
override def toScalaImpl(row: Row, column: Int): Date = toScala(row.getInt(column))
override def toScalaImpl(row: InternalRow, column: Int): Date = toScala(row.getInt(column))
}

private object TimestampConverter extends CatalystTypeConverter[Timestamp, Timestamp, Any] {
Expand All @@ -282,7 +284,8 @@ object CatalystTypeConverters {
override def toScala(catalystValue: Any): Timestamp =
if (catalystValue == null) null
else DateUtils.toJavaTimestamp(catalystValue.asInstanceOf[Long])
override def toScalaImpl(row: Row, column: Int): Timestamp = toScala(row.getLong(column))
override def toScalaImpl(row: InternalRow, column: Int): Timestamp =
toScala(row.getLong(column))
}

private object BigDecimalConverter extends CatalystTypeConverter[Any, JavaBigDecimal, Decimal] {
Expand All @@ -292,10 +295,11 @@ object CatalystTypeConverters {
case d: Decimal => d
}
override def toScala(catalystValue: Decimal): JavaBigDecimal = catalystValue.toJavaBigDecimal
override def toScalaImpl(row: Row, column: Int): JavaBigDecimal = row.get(column) match {
case d: JavaBigDecimal => d
case d: Decimal => d.toJavaBigDecimal
}
override def toScalaImpl(row: InternalRow, column: Int): JavaBigDecimal =
row.get(column) match {
case d: JavaBigDecimal => d
case d: Decimal => d.toJavaBigDecimal
}
}

private abstract class PrimitiveConverter[T] extends CatalystTypeConverter[T, Any, Any] {
Expand All @@ -304,31 +308,31 @@ object CatalystTypeConverters {
}

private object BooleanConverter extends PrimitiveConverter[Boolean] {
override def toScalaImpl(row: Row, column: Int): Boolean = row.getBoolean(column)
override def toScalaImpl(row: InternalRow, column: Int): Boolean = row.getBoolean(column)
}

private object ByteConverter extends PrimitiveConverter[Byte] {
override def toScalaImpl(row: Row, column: Int): Byte = row.getByte(column)
override def toScalaImpl(row: InternalRow, column: Int): Byte = row.getByte(column)
}

private object ShortConverter extends PrimitiveConverter[Short] {
override def toScalaImpl(row: Row, column: Int): Short = row.getShort(column)
override def toScalaImpl(row: InternalRow, column: Int): Short = row.getShort(column)
}

private object IntConverter extends PrimitiveConverter[Int] {
override def toScalaImpl(row: Row, column: Int): Int = row.getInt(column)
override def toScalaImpl(row: InternalRow, column: Int): Int = row.getInt(column)
}

private object LongConverter extends PrimitiveConverter[Long] {
override def toScalaImpl(row: Row, column: Int): Long = row.getLong(column)
override def toScalaImpl(row: InternalRow, column: Int): Long = row.getLong(column)
}

private object FloatConverter extends PrimitiveConverter[Float] {
override def toScalaImpl(row: Row, column: Int): Float = row.getFloat(column)
override def toScalaImpl(row: InternalRow, column: Int): Float = row.getFloat(column)
}

private object DoubleConverter extends PrimitiveConverter[Double] {
override def toScalaImpl(row: Row, column: Int): Double = row.getDouble(column)
override def toScalaImpl(row: InternalRow, column: Int): Double = row.getDouble(column)
}

/**
Expand Down Expand Up @@ -382,7 +386,7 @@ object CatalystTypeConverters {
case d: BigDecimal => BigDecimalConverter.toCatalyst(d)
case d: JavaBigDecimal => BigDecimalConverter.toCatalyst(d)
case seq: Seq[Any] => seq.map(convertToCatalyst)
case r: Row => Row(r.toSeq.map(convertToCatalyst): _*)
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => arr.toSeq.map(convertToCatalyst).toArray
case m: Map[Any, Any] =>
m.map { case (k, v) => (convertToCatalyst(k), convertToCatalyst(v)) }.toMap
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRow

/**
* An abstract class for row used internal in Spark SQL, which only contain the columns as
* internal types.
*/
abstract class InternalRow extends Row {
// A default implementation to change the return type
override def copy(): InternalRow = {this}
}

object InternalRow {
def unapplySeq(row: InternalRow): Some[Seq[Any]] = Some(row.toSeq)

/**
* This method can be used to construct a [[Row]] with the given values.
*/
def apply(values: Any*): InternalRow = new GenericRow(values.toArray)

/**
* This method can be used to construct a [[Row]] from a [[Seq]] of values.
*/
def fromSeq(values: Seq[Any]): InternalRow = new GenericRow(values.toArray)

def fromTuple(tuple: Product): InternalRow = fromSeq(tuple.productIterator.toSeq)

/**
* Merge multiple rows into a single row, one after another.
*/
def merge(rows: InternalRow*): InternalRow = {
// TODO: Improve the performance of this if used in performance critical part.
new GenericRow(rows.flatMap(_.toSeq).toArray)
}

/** Returns an empty row. */
val empty = apply()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.{errors, trees}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -67,7 +68,7 @@ case class UnresolvedAttribute(nameParts: Seq[String])
override def withName(newName: String): UnresolvedAttribute = UnresolvedAttribute.quoted(newName)

// Unresolved attributes are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): Any =
override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"'$name"
Expand All @@ -85,7 +86,7 @@ case class UnresolvedFunction(name: String, children: Seq[Expression]) extends E
override lazy val resolved = false

// Unresolved functions are transient at compile time and don't get evaluated during execution.
override def eval(input: Row = null): Any =
override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"'$name(${children.mkString(",")})"
Expand All @@ -107,7 +108,7 @@ trait Star extends NamedExpression with trees.LeafNode[Expression] {
override lazy val resolved = false

// Star gets expanded at runtime so we never evaluate a Star.
override def eval(input: Row = null): Any =
override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

def expand(input: Seq[Attribute], resolver: Resolver): Seq[NamedExpression]
Expand Down Expand Up @@ -166,7 +167,7 @@ case class MultiAlias(child: Expression, names: Seq[String])

override lazy val resolved = false

override def eval(input: Row = null): Any =
override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"$child AS $names"
Expand Down Expand Up @@ -200,7 +201,7 @@ case class UnresolvedExtractValue(child: Expression, extraction: Expression)
override def nullable: Boolean = throw new UnresolvedException(this, "nullable")
override lazy val resolved = false

override def eval(input: Row = null): Any =
override def eval(input: catalyst.InternalRow = null): Any =
throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")

override def toString: String = s"$child[$extraction]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions.codegen.{GeneratedExpressionCode, CodeGenContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.{InternalRow, trees}

/**
* A bound reference points to a specific slot in the input tuple, allowing the actual value
Expand All @@ -33,7 +33,7 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean)

override def toString: String = s"input[$ordinal]"

override def eval(input: Row): Any = input(ordinal)
override def eval(input: InternalRow): Any = input(ordinal)

override def name: String = s"i[$ordinal]"

Expand Down
Loading

0 comments on commit d46f8e5

Please sign in to comment.