Skip to content

Commit

Permalink
[GLUTEN-1632][CH]Daily Update Clickhouse Version (20241129) (#8087)
Browse files Browse the repository at this point in the history
* [GLUTEN-1632][CH]Daily Update Clickhouse Version (20241129)

* Fix Build AND UT Due to [Added cache for primary index](ClickHouse/ClickHouse#72102)

* Fix Build and UT due to [no auto write buffer finalization in destructors](ClickHouse/ClickHouse#68800)

- Make LocalPartitionWriter::evictPartitions called, e.g. set(GlutenConfig.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024*1024).toString)

* Fix Build due to [Save several minutes of build time](ClickHouse/ClickHouse#72046)

* Fix Benchmark Build due to [Scatter blocks in hash join without copying](ClickHouse/ClickHouse#67782)

(cherry picked from commit 8d566d6a8b8785e4072ffd6f774eb83b07ac3d8d)

* Fix Benchmark Build

* Fix endless loop due to ClickHouse/ClickHouse#70598

* [Refactor #8100] using CHConf.setCHConfig()

* fix style

---------

Co-authored-by: kyligence-git <gluten@kyligence.io>
Co-authored-by: Chang Chen <baibaichen@gmail.com>
  • Loading branch information
3 people authored Nov 30, 2024
1 parent 4cafdf1 commit b2706cb
Show file tree
Hide file tree
Showing 44 changed files with 151 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@ class GlutenClickHouseTPCHBucketSuite
override protected val queriesResults: String = rootPath + "bucket-queries-output"

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket join
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
"true")
.setCHConfig("enable_grace_aggregate_spill_test", "true")
}

override protected val createNullableTables = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui

/** Run Gluten + ClickHouse Backend with SortShuffleManager */
override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._
super.sparkConf
.set("spark.shuffle.manager", "sort")
.set("spark.io.compression.codec", "snappy")
Expand All @@ -38,9 +39,7 @@ class GlutenClickHouseTPCDSParquetSuite extends GlutenClickHouseTPCDSAbstractSui
.set("spark.memory.offHeap.size", "4g")
.set("spark.gluten.sql.validation.logLevel", "ERROR")
.set("spark.gluten.sql.validation.printStackOnFailure", "true")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
"true")
.setCHConfig("enable_grace_aggregate_spill_test", "true")
}

executeTPCDSTest(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.setCHConfig("enable_streaming_aggregating", true)
.set(GlutenConfig.COLUMNAR_CH_SHUFFLE_SPILL_THRESHOLD.key, (1024 * 1024).toString)
}

override protected def createTPCHNotNullTables(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,15 @@ class GlutenClickHouseTPCHParquetBucketSuite
protected val bucketTableDataPath: String = basePath + "/tpch-parquet-bucket"

override protected def sparkConf: SparkConf = {
import org.apache.gluten.backendsapi.clickhouse.CHConf._
super.sparkConf
.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")
.set("spark.io.compression.codec", "LZ4")
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "-1") // for test bucket join
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_config.enable_grace_aggregate_spill_test",
"true")
.setCHConfig("enable_grace_aggregate_spill_test", "true")
}

override protected val createNullableTables = true
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20241118
CH_COMMIT=a5944dfb7b3
CH_BRANCH=rebase_ch/20241129
CH_COMMIT=101ba3f944d1
26 changes: 22 additions & 4 deletions cpp-ch/local-engine/Common/CHUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@

namespace DB
{
namespace ServerSetting
{
extern const ServerSettingsString primary_index_cache_policy;
extern const ServerSettingsUInt64 primary_index_cache_size;
extern const ServerSettingsDouble primary_index_cache_size_ratio;
}
namespace Setting
{
extern const SettingsUInt64 prefer_external_sort_block_bytes;
Expand Down Expand Up @@ -712,11 +718,13 @@ void BackendInitializerUtil::initSettings(const SparkConfigs::ConfigMap & spark_
settings.set("input_format_parquet_enable_row_group_prefetch", false);
settings.set("output_format_parquet_use_custom_encoder", false);

/// update per https://github.com/ClickHouse/ClickHouse/pull/71539
/// Set false after https://github.com/ClickHouse/ClickHouse/pull/71539
/// if true, we can't get correct metrics for the query
settings[Setting::query_plan_merge_filters] = false;

/// We now set BuildQueryPipelineSettings according to config.
settings[Setting::compile_expressions] = true;
// TODO: FIXME. Set false after https://github.com/ClickHouse/ClickHouse/pull/70598.
settings[Setting::compile_expressions] = false;
settings[Setting::short_circuit_function_evaluation] = ShortCircuitFunctionEvaluation::DISABLE;
///

Expand Down Expand Up @@ -820,6 +828,10 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
/// Make sure global_context and shared_context are constructed only once.
if (auto global_context = QueryContext::globalMutableContext(); !global_context)
{
ServerSettings server_settings;
server_settings.loadSettingsFromConfig(*config);

auto log = getLogger("CHUtil");
global_context = QueryContext::createGlobal();
global_context->makeGlobalContext();
global_context->setConfig(config);
Expand All @@ -844,10 +856,16 @@ void BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
size_t mark_cache_size = config->getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_MAX_SIZE);
double mark_cache_size_ratio = config->getDouble("mark_cache_size_ratio", DEFAULT_MARK_CACHE_SIZE_RATIO);
if (!mark_cache_size)
LOG_ERROR(&Poco::Logger::get("CHUtil"), "Too low mark cache size will lead to severe performance degradation.");

LOG_ERROR(log, "Mark cache is disabled, it will lead to severe performance degradation.");
LOG_INFO(log, "mark cache size to {}.", formatReadableSizeWithBinarySuffix(mark_cache_size));
global_context->setMarkCache(mark_cache_policy, mark_cache_size, mark_cache_size_ratio);

String primary_index_cache_policy = server_settings[ServerSetting::primary_index_cache_policy];
size_t primary_index_cache_size = server_settings[ServerSetting::primary_index_cache_size];
double primary_index_cache_size_ratio = server_settings[ServerSetting::primary_index_cache_size_ratio];
LOG_INFO(log, "Primary index cache size to {}.", formatReadableSizeWithBinarySuffix(primary_index_cache_size));
global_context->setPrimaryIndexCache(primary_index_cache_policy, primary_index_cache_size, primary_index_cache_size_ratio);

String index_uncompressed_cache_policy
= config->getString("index_uncompressed_cache_policy", DEFAULT_INDEX_UNCOMPRESSED_CACHE_POLICY);
size_t index_uncompressed_cache_size
Expand Down
4 changes: 1 addition & 3 deletions cpp-ch/local-engine/Common/GlutenSignalHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static void writeSignalIDtoSignalPipe(int sig)
char buf[signal_pipe_buf_size];
WriteBufferFromFileDescriptor out(writeFD(), signal_pipe_buf_size, buf);
writeBinary(sig, out);
out.next();
out.finalize();
errno = saved_errno;
}

Expand Down Expand Up @@ -251,9 +251,7 @@ class SignalListener : public Poco::Runnable
query = thread_ptr->getQueryForLog();

if (auto logs_queue = thread_ptr->getInternalTextLogsQueue())
{
CurrentThread::attachInternalTextLogsQueue(logs_queue, LogsLevel::trace);
}
}
std::string signal_description = "Unknown signal";

Expand Down
9 changes: 6 additions & 3 deletions cpp-ch/local-engine/Functions/FunctionGreatestLeast.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Functions/LeastGreatestGeneric.h>
#include <DataTypes/getLeastSupertype.h>
#pragma once
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/getLeastSupertype.h>
#include <Functions/FunctionBinaryArithmetic.h>
#include <Functions/FunctionFactory.h>
#include <Functions/LeastGreatestGeneric.h>

namespace DB
{
Expand Down Expand Up @@ -64,7 +67,7 @@ class FunctionGreatestestLeast : public DB::FunctionLeastGreatestGeneric<kind>
else
{
auto cmp_result = converted_columns[arg]->compareAt(row_num, row_num, *converted_columns[best_arg], 1);
if (cmp_result < 0)
if (cmp_result < 0)
best_arg = arg;
}
}
Expand Down
13 changes: 8 additions & 5 deletions cpp-ch/local-engine/Functions/SparkFunctionArrayJoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <base/StringRef.h>
#include <Interpreters/Context_fwd.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnNullable.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
#include <base/StringRef.h>

using namespace DB;

Expand Down
11 changes: 6 additions & 5 deletions cpp-ch/local-engine/Functions/SparkFunctionArraysOverlap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnNullable.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>

using namespace DB;

Expand Down Expand Up @@ -92,7 +93,7 @@ class SparkFunctionArraysOverlap : public IFunction
{
res_data[i] = 1;
null_map_data[i] = 0;
break;
break;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Functions/FunctionFactory.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsNumber.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Functions/castTypeToEither.h>
#include <Common/CurrentThread.h>
#include <Common/GlutenDecimalUtils.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>

namespace DB
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Functions/SparkFunctionMakeDecimal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/
#include <Columns/ColumnNullable.h>
#include <Core/DecimalFunctions.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include "SparkFunctionCheckDecimalOverflow.h"

#include <Functions/SparkFunctionCheckDecimalOverflow.h>

namespace DB
{
Expand Down
7 changes: 4 additions & 3 deletions cpp-ch/local-engine/Functions/SparkFunctionMapToString.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once
#include <memory>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnStringHelpers.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>

namespace DB
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/FunctionTokens.h>
#include <Functions/IFunctionAdaptors.h>
#include <Functions/Regexps.h>
#include <Common/StringUtils.h>
#include <base/map.h>
#include <Common/assert_cast.h>

Expand Down
11 changes: 7 additions & 4 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string>
#include <string_view>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Columns/ColumnConst.h>
#include <Core/Block.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Core/Names.h>
Expand Down Expand Up @@ -160,15 +161,17 @@ void SerializedPlanParser::adjustOutput(const DB::QueryPlanPtr & query_plan, con
else
{
need_final_project = true;
bool need_const = origin_column.column && isColumnConst(*origin_column.column);
if (need_const)
if (origin_column.column && isColumnConst(*origin_column.column))
{
/// For const column, we need to cast it individually. Otherwise, the const column will be converted to full column in
/// ActionsDAG::makeConvertingActions.
/// Note: creating fianl_column with Field of origin_column will cause Exception in some case.
const DB::ContextPtr context = DB::CurrentThread::get().getQueryContext();
const FunctionOverloadResolverPtr & cast_resolver = FunctionFactory::instance().get("CAST", context);
const DataTypePtr string_type = std::make_shared<DataTypeString>();
ColumnWithTypeAndName to_type_column = {string_type->createColumnConst(1, final_type->getName()), string_type, "__cast_const__"};
FunctionBasePtr cast_function = cast_resolver->build({origin_column, to_type_column});
ColumnPtr const_col = ColumnConst::create(cast_function->execute({origin_column, to_type_column}, final_type, 1), 1);
ColumnPtr const_col = ColumnConst::create(cast_function->execute({origin_column, to_type_column}, final_type, 1, false), 1);
ColumnWithTypeAndName final_column(const_col, final_type, origin_column.name);
final_columns.emplace_back(std::move(final_column));
}
Expand Down Expand Up @@ -310,7 +313,7 @@ DB::QueryPipelineBuilderPtr SerializedPlanParser::buildQueryPipeline(DB::QueryPl
BuildQueryPipelineSettings build_settings = BuildQueryPipelineSettings::fromContext(context);
build_settings.process_list_element = query_status;
build_settings.progress_callback = nullptr;
return query_plan.buildQueryPipeline(optimization_settings,build_settings);
return query_plan.buildQueryPipeline(optimization_settings, build_settings);
}

std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(const std::string_view plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Parser/FunctionParser.h>
#include <Common/Exception.h>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* limitations under the License.
*/
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Parser/FunctionParser.h>
#include <Common/Exception.h>
#include <Common/assert_cast.h>

namespace DB
{
namespace ErrorCodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Parser/FunctionParser.h>
#include <Common/Exception.h>
Expand Down Expand Up @@ -56,7 +57,7 @@ class FunctionParserArrayRepeat : public FunctionParser
const auto * const_zero_node = addColumnToActionsDAG(actions_dag, n_not_null_arg->result_type, {0});
const auto * greatest_node = toFunctionNode(actions_dag, "greatest", {n_not_null_arg, const_zero_node});
const auto * range_node = toFunctionNode(actions_dag, "range", {greatest_node});
const auto & range_type = assert_cast<const DataTypeArray & >(*removeNullable(range_node->result_type));
const auto & range_type = assert_cast<const DataTypeArray &>(*removeNullable(range_node->result_type));

// Create lambda function x -> elem
ActionsDAG lambda_actions_dag;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
* limitations under the License.
*/

#include <Parser/FunctionParser.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/IDataType.h>
#include <Parser/FunctionParser.h>

namespace DB
{
Expand Down Expand Up @@ -57,7 +58,7 @@ class FunctionParserBitLength : public FunctionParser
const auto * const_eight_node = addColumnToActionsDAG(actions_dag, std::make_shared<DataTypeInt32>(), 8);
const auto * result_node = toFunctionNode(actions_dag, "multiply", {octet_length_node, const_eight_node});

return convertNodeTypeIfNeeded(substrait_func, result_node, actions_dag);;
return convertNodeTypeIfNeeded(substrait_func, result_node, actions_dag);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/

#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/IDataType.h>
#include <Functions/FunctionHelpers.h>
#include <Parser/FunctionParser.h>

namespace DB
Expand Down
Loading

0 comments on commit b2706cb

Please sign in to comment.