Skip to content

Commit

Permalink
[SPARK-13166][SQL] Remove DataStreamReader/Writer
Browse files Browse the repository at this point in the history
They seem redundant and we can simply use DataFrameReader/Writer. The new usage looks like:

```scala
val df = sqlContext.read.stream("...")
val handle = df.write.stream("...")
handle.stop()
```

Author: Reynold Xin <rxin@databricks.com>

Closes apache#11062 from rxin/SPARK-13166.
  • Loading branch information
rxin authored and marmbrus committed Feb 4, 2016
1 parent 3221edd commit 915a753
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 315 deletions.
10 changes: 1 addition & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1682,22 +1682,14 @@ class DataFrame private[sql](

/**
* :: Experimental ::
* Interface for saving the content of the [[DataFrame]] out into external storage.
* Interface for saving the content of the [[DataFrame]] out into external storage or streams.
*
* @group output
* @since 1.4.0
*/
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)

/**
* :: Experimental ::
* Interface for starting a streaming query that will continually output results to the specified
* external sink as new data arrives.
*/
@Experimental
def streamTo: DataStreamWriter = new DataStreamWriter(this)

/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
Expand Down
29 changes: 26 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystQl}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType

/**
* :: Experimental ::
* Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
* key-value stores, etc). Use [[SQLContext.read]] to access this.
* key-value stores, etc) or data streams. Use [[SQLContext.read]] to access this.
*
* @since 1.4.0
*/
Expand Down Expand Up @@ -136,6 +136,30 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load()
}

/**
* Loads input data stream in as a [[DataFrame]], for data streams that don't require a path
* (e.g. external key-value stores).
*
* @since 2.0.0
*/
def stream(): DataFrame = {
val resolved = ResolvedDataSource.createSource(
sqlContext,
userSpecifiedSchema = userSpecifiedSchema,
providerName = source,
options = extraOptions.toMap)
DataFrame(sqlContext, StreamingRelation(resolved))
}

/**
* Loads input in as a [[DataFrame]], for data streams that read from some path.
*
* @since 2.0.0
*/
def stream(path: String): DataFrame = {
option("path", path).stream()
}

/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
Expand Down Expand Up @@ -165,7 +189,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
*
* @since 1.4.0
*/
def jdbc(
Expand Down
36 changes: 32 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ import java.util.Properties
import scala.collection.JavaConverters._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{CatalystQl, TableIdentifier}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
import org.apache.spark.sql.execution.datasources.{BucketSpec, CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.sources.HadoopFsRelation

/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
* key-value stores, etc). Use [[DataFrame.write]] to access this.
* key-value stores, etc) or data streams. Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
Expand Down Expand Up @@ -183,6 +184,34 @@ final class DataFrameWriter private[sql](df: DataFrame) {
df)
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
def stream(path: String): ContinuousQuery = {
option("path", path).stream()
}

/**
* Starts the execution of the streaming query, which will continually output results to the given
* path as new data arrives. The returned [[ContinuousQuery]] object can be used to interact with
* the stream.
*
* @since 2.0.0
*/
def stream(): ContinuousQuery = {
val sink = ResolvedDataSource.createSink(
df.sqlContext,
source,
extraOptions.toMap,
normalizedParCols.getOrElse(Nil))

new StreamExecution(df.sqlContext, df.logicalPlan, sink)
}

/**
* Inserts the content of the [[DataFrame]] to the specified table. It requires that
* the schema of the [[DataFrame]] is the same as the schema of the table.
Expand Down Expand Up @@ -255,7 +284,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {

/**
* The given column name may not be equal to any of the existing column names if we were in
* case-insensitive context. Normalize the given column name to the real one so that we don't
* case-insensitive context. Normalize the given column name to the real one so that we don't
* need to care about case sensitivity afterwards.
*/
private def normalize(columnName: String, columnType: String): String = {
Expand Down Expand Up @@ -339,7 +368,6 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
*
* @since 1.4.0
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
Expand Down
127 changes: 0 additions & 127 deletions sql/core/src/main/scala/org/apache/spark/sql/DataStreamReader.scala

This file was deleted.

Loading

0 comments on commit 915a753

Please sign in to comment.