Skip to content

Commit

Permalink
[SPARK-10185] [SQL] Feat sql comma separated paths
Browse files Browse the repository at this point in the history
Make sure comma-separated paths get processed correcly in ResolvedDataSource for a HadoopFsRelationProvider

Author: Koert Kuipers <koert@tresata.com>

Closes apache#8416 from koertkuipers/feat-sql-comma-separated-paths.
koertkuipers authored and davies committed Oct 17, 2015
1 parent 2549374 commit 57f83e3
Showing 5 changed files with 81 additions and 11 deletions.
14 changes: 13 additions & 1 deletion python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
@@ -116,14 +116,26 @@ def load(self, path=None, format=None, schema=None, **options):
... opt2=1, opt3='str')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
>>> df = sqlContext.read.format('json').load(['python/test_support/sql/people.json',
... 'python/test_support/sql/people1.json'])
>>> df.dtypes
[('age', 'bigint'), ('aka', 'string'), ('name', 'string')]
"""
if format is not None:
self.format(format)
if schema is not None:
self.schema(schema)
self.options(**options)
if path is not None:
return self._df(self._jreader.load(path))
if type(path) == list:
paths = path
gateway = self._sqlContext._sc._gateway
jpaths = gateway.new_array(gateway.jvm.java.lang.String, len(paths))
for i in range(0, len(paths)):
jpaths[i] = paths[i]
return self._df(self._jreader.load(jpaths))
else:
return self._df(self._jreader.load(path))
else:
return self._df(self._jreader.load())

2 changes: 2 additions & 0 deletions python/test_support/sql/people1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"name":"Jonathan", "aka": "John"}

11 changes: 11 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import java.util.Properties
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.StringUtils

import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
@@ -123,6 +124,16 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
DataFrame(sqlContext, LogicalRelation(resolved.relation))
}

/**
* Loads input in as a [[DataFrame]], for data sources that support multiple paths.
* Only works if the source is a HadoopFsRelationProvider.
*
* @since 1.6.0
*/
def load(paths: Array[String]): DataFrame = {
option("paths", paths.map(StringUtils.escapeString(_, '\\', ',')).mkString(",")).load()
}

/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@ import scala.language.{existentials, implicitConversions}
import scala.util.{Success, Failure, Try}

import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.StringUtils

import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
@@ -89,7 +90,11 @@ object ResolvedDataSource extends Logging {
val relation = userSpecifiedSchema match {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
if (caseInsensitiveOptions.contains("paths")) {
throw new AnalysisException(s"$className does not support paths option.")
}
dataSource.createRelation(sqlContext, caseInsensitiveOptions, schema)
case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
@@ -99,10 +104,19 @@ object ResolvedDataSource extends Logging {

val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
if (caseInsensitiveOptions.contains("paths") &&
caseInsensitiveOptions.contains("path")) {
throw new AnalysisException(s"Both path and paths options are present.")
}
caseInsensitiveOptions.get("paths")
.map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
.getOrElse(Array(caseInsensitiveOptions("path")))
.flatMap{ pathString =>
val hdfsPath = new Path(pathString)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
}
}

val dataSchema =
@@ -122,14 +136,27 @@ object ResolvedDataSource extends Logging {

case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
if (caseInsensitiveOptions.contains("paths")) {
throw new AnalysisException(s"$className does not support paths option.")
}
dataSource.createRelation(sqlContext, caseInsensitiveOptions)
case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
if (caseInsensitiveOptions.contains("paths") &&
caseInsensitiveOptions.contains("path")) {
throw new AnalysisException(s"Both path and paths options are present.")
}
caseInsensitiveOptions.get("paths")
.map(_.split("(?<!\\\\),").map(StringUtils.unEscapeString(_, '\\', ',')))
.getOrElse(Array(caseInsensitiveOptions("path")))
.flatMap{ pathString =>
val hdfsPath = new Path(pathString)
val fs = hdfsPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualified).map(_.toString)
}
}
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
18 changes: 18 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
@@ -890,6 +890,24 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.collect()
}

test("SPARK-10185: Read multiple Hadoop Filesystem paths and paths with a comma in it") {
withTempDir { dir =>
val df1 = Seq((1, 22)).toDF("a", "b")
val dir1 = new File(dir, "dir,1").getCanonicalPath
df1.write.format("json").save(dir1)

val df2 = Seq((2, 23)).toDF("a", "b")
val dir2 = new File(dir, "dir2").getCanonicalPath
df2.write.format("json").save(dir2)

checkAnswer(sqlContext.read.format("json").load(Array(dir1, dir2)),
Row(1, 22) :: Row(2, 23) :: Nil)

checkAnswer(sqlContext.read.format("json").load(dir1),
Row(1, 22) :: Nil)
}
}

test("SPARK-10034: Sort on Aggregate with aggregation expression named 'aggOrdering'") {
val df = Seq(1 -> 2).toDF("i", "j")
val query = df.groupBy('i)

0 comments on commit 57f83e3

Please sign in to comment.