Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support running Pandas UDFs on GPUs in Python processes. #640

Merged
merged 72 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
4063857
Support Pandas UDF on GPU
firestarman Jul 21, 2020
94f3b22
Fix an error when running rapids.worker.
firestarman Jul 21, 2020
96e35aa
Pack python files
firestarman Jul 22, 2020
dccf977
Add API to init GPU context in python process
firestarman Jul 27, 2020
ef36d5e
Support limiting the number of python workers
firestarman Jul 31, 2020
93966b4
Support memory limitaion for Python processes
firestarman Aug 4, 2020
3b2e527
Imporve the memory computation for Python workers
firestarman Aug 5, 2020
1e4895a
Support setting max size of RMM pool
firestarman Aug 6, 2020
c877506
Support more types of Pandas UDF
firestarman Aug 11, 2020
d0d15c2
Use maxsize for max pool size when not specified.
firestarman Aug 18, 2020
61b3589
Support two more types of Pandas UDF
firestarman Aug 27, 2020
65e497d
Add tests for udfs and basic support for accelerated arrow exchange with
revans2 Aug 12, 2020
ec2cec6
Support Pandas UDF on GPU
firestarman Jul 21, 2020
d18ff4e
Fix an error when running rapids.worker.
firestarman Jul 21, 2020
616d1da
Pack python files
firestarman Jul 22, 2020
c5557ca
Add API to init GPU context in python process
firestarman Jul 27, 2020
2fcc2aa
Support limiting the number of python workers
firestarman Jul 31, 2020
31a9fb3
Support memory limitaion for Python processes
firestarman Aug 4, 2020
f71f4de
Imporve the memory computation for Python workers
firestarman Aug 5, 2020
d4791af
Support setting max size of RMM pool
firestarman Aug 6, 2020
4b88939
Support more types of Pandas UDF
firestarman Aug 11, 2020
d5d156e
Use maxsize for max pool size when not specified.
firestarman Aug 18, 2020
37dc856
Support two more types of Pandas UDF
firestarman Aug 27, 2020
29e39ea
Use the columnar version rule for Scalar Pandas UDF
firestarman Sep 1, 2020
1033f23
Updates the RapidsMeta of plans for Pandas UDF
firestarman Sep 2, 2020
5e28772
Remove the unnecessary env variable
firestarman Sep 2, 2020
3f94ac8
Correct some doc styles to pass mvn verification
firestarman Sep 3, 2020
6f24ca2
add udf test
shotai Sep 3, 2020
5ea3ab6
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 3, 2020
69d5b54
Support process pool for Python workers
firestarman Sep 3, 2020
9b25c2e
merge udftest
shotai Sep 3, 2020
c8ab681
add cudf test
shotai Sep 3, 2020
b81fe94
Add a config to disable/enable Pandas UDF on GPU.
firestarman Sep 4, 2020
908bb93
add more test case with cudf
shotai Sep 4, 2020
963c821
refactor udf test
shotai Sep 4, 2020
868ca3a
Python: Not init GPU if no cuda device specified
firestarman Sep 4, 2020
92c10a6
resolve conflict
shotai Sep 4, 2020
e05e4b6
resolve conflict
shotai Sep 4, 2020
69b1ec0
Update the config doc
firestarman Sep 7, 2020
1f56cd9
skip udf in premerge
shotai Sep 7, 2020
e400f5a
add pyarrow in docker
shotai Sep 7, 2020
6435eaa
disable udf test in premerge
shotai Sep 7, 2020
c33b41c
Merge pull request #4 from firestarman/pandas-test-mg
firestarman Sep 8, 2020
53959db
Merge branch 'branch-0.2' into pandas-udf-col
firestarman Sep 8, 2020
732505b
Move gpu init to `try...catch`
firestarman Sep 8, 2020
5a074ac
Remove numpy, it will include in pandas installation. Update readme.
shotai Sep 8, 2020
b80a201
update doc with pandas udf support
shotai Sep 8, 2020
7514ac1
update integration dockerfile
shotai Sep 8, 2020
db06504
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 8, 2020
7817e5e
Update getting-started-on-prem.md
shotai Sep 8, 2020
0f72025
Update getting-started-on-prem.md
shotai Sep 8, 2020
35a3008
Update getting-started-on-prem.md
shotai Sep 8, 2020
ec7848e
Update getting-started-on-prem.md
shotai Sep 8, 2020
be65f58
Add warning log when python worker reuse enabled
firestarman Sep 8, 2020
1757afa
Replace GpuSemaphore with PythonWorkerSemaphore
firestarman Sep 8, 2020
537e594
Remove the warning log for python worker reuse enabled
firestarman Sep 9, 2020
60cf951
remove udf marker, add comment, update jenkins script for udf_cudf test
shotai Sep 9, 2020
6c9e86b
update doc in pandas udf section
shotai Sep 9, 2020
58fab52
update dockerfile for integration test
shotai Sep 9, 2020
283b6a2
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 9, 2020
eee4d05
Update the name of conf for python gpu enabled.
firestarman Sep 10, 2020
8924ad8
add marker for cudf udf test
shotai Sep 10, 2020
7eba830
update comment in test start script
shotai Sep 10, 2020
f838ae0
remove old config
shotai Sep 10, 2020
803fcf4
Not init gpu memory when python on gpu is disabled
firestarman Sep 10, 2020
984082b
remove old config
shotai Sep 10, 2020
127ab08
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 10, 2020
6156298
import cudf lib normally
shotai Sep 10, 2020
beabf8b
update import cudf
shotai Sep 10, 2020
47ffc98
Check python module conf only when python gpu enabeld
firestarman Sep 10, 2020
b1c9be5
update dynamic config for udf enable
shotai Sep 10, 2020
9860ee6
Merge branch 'pandas-udf' of https://github.com/firestarman/spark-rap…
shotai Sep 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Correct some doc styles to pass mvn verification
to include the Python configs in config doc generation
to update the file `configs.md`.
  • Loading branch information
firestarman committed Sep 3, 2020
commit 3f94ac8b608e311c181892fc72756d894627037f
12 changes: 11 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ Name | Description | Default Value
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0
<a name="memory.uvm.enabled"></a>spark.rapids.memory.uvm.enabled|UVM or universal memory can allow main host memory to act essentially as swap for device(GPU) memory. This allows the GPU to process more data than fits in memory, but can result in slower processing. This is an experimental feature.|false
<a name="python.concurrentPythonWorkers"></a>spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0
<a name="python.memory.gpu.allocFraction"></a>spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers.|None
<a name="python.memory.gpu.maxAllocFraction"></a>spark.rapids.python.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.maxAllocFraction)), since the executor will share the GPU with its owning Python workers. when setting to 0 means no limit.|0.0
<a name="python.memory.gpu.pooling.enabled"></a>spark.rapids.python.memory.gpu.pooling.enabled|Should RMM in Python workers act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly.|None
<a name="python.memory.uvm.enabled"></a>spark.rapids.python.memory.uvm.enabled|Similar with `spark.rapids.python.memory.uvm.enabled`, but this conf is for python workers. This is an experimental feature.|None
<a name="shuffle.transport.enabled"></a>spark.rapids.shuffle.transport.enabled|When set to true, enable the Rapids Shuffle Transport for accelerated shuffle.|false
<a name="shuffle.transport.maxReceiveInflightBytes"></a>spark.rapids.shuffle.transport.maxReceiveInflightBytes|Maximum aggregate amount of bytes that be fetched at any given time from peers during shuffle|1073741824
<a name="shuffle.ucx.managementServerHost"></a>spark.rapids.shuffle.ucx.managementServerHost|The host to be used to start the management server|null
Expand Down Expand Up @@ -252,7 +257,12 @@ Name | Description | Default Value | Notes
<a name="sql.exec.CartesianProductExec"></a>spark.rapids.sql.exec.CartesianProductExec|Implementation of join using brute force|false|This is disabled by default because large joins can cause out of memory errors|
<a name="sql.exec.ShuffledHashJoinExec"></a>spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None|
<a name="sql.exec.SortMergeJoinExec"></a>spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|Runs python UDFs. The python code does not actually run on the GPU, but the transfer of data between the python process and the java process is accelerated.|false|This is disabled by default because Performance is not ideal for UDFs that take a long time.|
<a name="sql.exec.AggregateInPandasExec"></a>spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.ArrowEvalPythonExec"></a>spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time|
<a name="sql.exec.FlatMapCoGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.FlatMapGroupsInPandasExec"></a>spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.MapInPandasExec"></a>spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowInPandasExec"></a>spark.rapids.sql.exec.WindowInPandasExec|The backend for Pandas UDF with window functions, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now|
<a name="sql.exec.WindowExec"></a>spark.rapids.sql.exec.WindowExec|Window-operator backend|true|None|

### Scans
Expand Down
3 changes: 1 addition & 2 deletions python/rapids/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ def initialize_gpu_mem():
base_t = rmm.mr.ManagedMemoryResource if uvm_enabled else rmm.mr.CudaMemoryResource
rmm.mr.set_current_device_resource(rmm.mr.PoolMemoryResource(base_t(), pool_size, pool_max_size))
elif uvm_enabled:
# Will this really be needed for Python ?
from cudf import rmm
rmm.mr.set_default_resource(rmm.mr.ManagedMemoryResource())
rmm.mr.set_current_device_resource(rmm.mr.ManagedMemoryResource())
else:
# Do nothing, whether to use RMM (default mode) or not depends on UDF definition.
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,8 +1672,9 @@ object GpuOverrides {
GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded())
}),
exec[ArrowEvalPythonExec](
"Runs python UDFs. The python code does not actually run on the GPU, but the " +
"transfer of data between the python process and the java process is accelerated.",
"The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU" +
" when calling cuDF APIs in the UDF, also accelerates the data transfer between the" +
" Java process and Python process",
(e, conf, p, r) =>
new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) {
val udfs: Seq[BaseExprMeta[PythonUDF]] =
Expand Down Expand Up @@ -1772,23 +1773,28 @@ object GpuOverrides {
}
}),
exec[MapInPandasExec](
"The backend for Map Pandas Iterator UDF",
"The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running" +
" the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(mapPy, conf, p, r) => new GpuMapInPandasExecMeta(mapPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
exec[FlatMapGroupsInPandasExec](
"The backend for Grouped Map Pandas UDF",
"The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running" +
" the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(flatPy, conf, p, r) => new GpuFlatMapGroupsInPandasExecMeta(flatPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
exec[AggregateInPandasExec](
"The backend for Grouped Aggregation Pandas UDF",
"The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(aggPy, conf, p, r) => new GpuAggregateInPandasExecMeta(aggPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
exec[FlatMapCoGroupsInPandasExec](
"The backend for CoGrouped Aggregation Pandas UDF",
"The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(flatCoPy, conf, p, r) => new GpuFlatMapCoGroupsInPandasExecMeta(flatCoPy, conf, p, r))
.disabledByDefault("Performance is not ideal now"),
exec[WindowInPandasExec](
"The backend for Pandas UDF with window functions",
"The backend for Pandas UDF with window functions, it runs on CPU itself now but supports" +
" running the Python UDFs code on GPU when calling cuDF APIs in the UDF",
(winPy, conf, p, r) => new GpuWindowInPandasExecMeta(winPy, conf, p, r))
.disabledByDefault("Performance is not ideal now")
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,8 @@ object RapidsConf {
}
}
def main(args: Array[String]): Unit = {
// Include the configs in PythonConfEntries
com.nvidia.spark.rapids.python.PythonConfEntries.init()
val out = new FileOutputStream(new File(args(0)))
Console.withOut(out) {
Console.withErr(out) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,4 +59,8 @@ object PythonConfEntries {
"python workers. This is an experimental feature.")
.booleanConf
.createOptional
revans2 marked this conversation as resolved.
Show resolved Hide resolved

// An empty function called by RapidsConf to initialize the config definitions above for
// doc generation
def init(): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,19 +22,19 @@ import com.nvidia.spark.rapids.python.PythonConfEntries.CONCURRENT_PYTHON_WORKER
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.internal.Logging

/**
* PythonWorkerSemaphore is used to limit the number of Python workers(processes) to be started
* by an executor.
*
* This PythonWorkerSemaphore will not initialize the GPU, different from GpuSemaphore. Since
* tasks calling the API `acquireIfNecessary` are supposed not to use the GPU directly, but
* delegate the permits to the Python workers respectively.
*
* Call `acquireIfNecessary` or `releaseIfNecessary` directly when needed, since the inner
* semaphore will be initialized implicitly, but need to call `shutdown` explicitly to release
* the inner semaphore when no longer needed.
*
*/
/*
* PythonWorkerSemaphore is used to limit the number of Python workers(processes) to be started
* by an executor.
*
* This PythonWorkerSemaphore will not initialize the GPU, different from GpuSemaphore. Since
* tasks calling the API `acquireIfNecessary` are supposed not to use the GPU directly, but
* delegate the permits to the Python workers respectively.
*
* Call `acquireIfNecessary` or `releaseIfNecessary` directly when needed, since the inner
* semaphore will be initialized implicitly, but need to call `shutdown` explicitly to release
* the inner semaphore when no longer needed.
*
*/
object PythonWorkerSemaphore extends Logging {

private lazy val workersPerGpu = new RapidsConf(SparkEnv.get.conf)
Expand All @@ -57,32 +57,32 @@ object PythonWorkerSemaphore extends Logging {
instance
}

/**
* Tasks must call this when they begin to start a Python worker who will use GPU.
* If the task has not already acquired the GPU semaphore then it is acquired,
* blocking if necessary.
* NOTE: A task completion listener will automatically be installed to ensure
* the semaphore is always released by the time the task completes.
*/
/*
* Tasks must call this when they begin to start a Python worker who will use GPU.
* If the task has not already acquired the GPU semaphore then it is acquired,
* blocking if necessary.
* NOTE: A task completion listener will automatically be installed to ensure
* the semaphore is always released by the time the task completes.
*/
def acquireIfNecessary(context: TaskContext): Unit = {
if (enabled && context != null) {
getInstance.acquireIfNecessary(context)
}
}

/**
* Tasks must call this when they are finished using the GPU.
*/
/*
* Tasks must call this when they are finished using the GPU.
*/
def releaseIfNecessary(context: TaskContext): Unit = {
if (enabled && context != null) {
getInstance.releaseIfNecessary(context)
}
}

/**
* Release the inner semaphore.
* NOTE: This does not wait for active tasks to release!
*/
/*
* Release the inner semaphore.
* NOTE: This does not wait for active tasks to release!
*/
def shutdown(): Unit = synchronized {
if (instance != null) {
instance.shutdown()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.python.PandasGroupUtils
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
/*
* This is to expose the APIs of PandasGroupUtils to rapids Execs
*/
private[sql] object GpuPandasUtils {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,6 @@ import java.io.File

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
Expand Down Expand Up @@ -61,7 +60,7 @@ class GpuAggregateInPandasExecMeta(
)
}

/**
/*
* This GpuAggregateInPandasExec aims at supporting running Pandas UDF code
* on GPU at Python side.
*
Expand All @@ -77,8 +76,7 @@ case class GpuAggregateInPandasExec(

override def supportsColumnar = false
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// TBD
super.doExecuteColumnar()
throw new IllegalStateException(s"Columnar execution is not supported by $this yet")
}

// Most code is copied from AggregateInPandasExec, except two GPU related calls
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
Expand All @@ -22,26 +24,27 @@ import java.net.Socket
import java.util.concurrent.atomic.AtomicBoolean

import ai.rapids.cudf._
import com.nvidia.spark.rapids.GpuMetricNames._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuMetricNames._
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.python.PythonUDFRunner
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
import org.apache.spark.{SparkEnv, TaskContext}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class RebatchingIterator(
wrapped: Iterator[ColumnarBatch],
Expand Down Expand Up @@ -131,7 +134,7 @@ class BatchQueue extends AutoCloseable {
}
}

/**
/*
* Helper functions for [[GpuPythonUDF]]
*/
object GpuPythonUDF {
Expand All @@ -155,7 +158,7 @@ object GpuPythonUDF {
def isWindowPandasUDF(e: Expression): Boolean = isGroupedAggPandasUDF(e)
}

/**
/*
* A serialized version of a Python lambda function. This is a special expression, which needs a
* dedicated physical operator to execute it, and thus can't be pushed down to data sources.
*/
Expand Down Expand Up @@ -185,7 +188,7 @@ case class GpuPythonUDF(
}
}

/**
/*
* A trait that can be mixed-in with `BasePythonRunner`. It implements the logic from
* Python (Arrow) to GPU/JVM (ColumnarBatch).
*/
Expand Down Expand Up @@ -256,7 +259,7 @@ trait GpuPythonArrowOutput extends Arm { self: BasePythonRunner[_, ColumnarBatch
}


/**
/*
* Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream.
*/
class GpuArrowPythonRunner(
Expand Down Expand Up @@ -366,7 +369,7 @@ class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferPro
}
}

/**
/*
* A physical plan that evaluates a [[GpuPythonUDF]]. The transformation of the data to arrow
* happens on the GPU (practically a noop), But execution of the UDFs are on the CPU or GPU.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,7 +58,7 @@ class GpuFlatMapCoGroupsInPandasExecMeta(
)
}

/**
/*
*
* This GpuFlatMapCoGroupsInPandasExec aims at supporting running Pandas functional code
* on GPU at Python side.
Expand All @@ -77,8 +77,7 @@ case class GpuFlatMapCoGroupsInPandasExec(

override def supportsColumnar = false
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// TBD
super.doExecuteColumnar()
throw new IllegalStateException(s"Columnar execution is not supported by $this yet")
}

// Most code is copied from FlatMapCoGroupsInPandasExec, except two GPU related calls
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,7 +57,7 @@ class GpuFlatMapGroupsInPandasExecMeta(
)
}

/**
/*
*
* This GpuFlatMapGroupsInPandasExec aims at supporting running Pandas functional code
* on GPU at Python side.
Expand All @@ -74,8 +74,7 @@ case class GpuFlatMapGroupsInPandasExec(

override def supportsColumnar = false
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
// TBD
super.doExecuteColumnar()
throw new IllegalStateException(s"Columnar execution is not supported by $this yet")
}

// Most code is copied from FlatMapGroupsInPandasExec, except two GPU related calls
Expand Down
Loading