diff --git a/src/delta_extension.cpp b/src/delta_extension.cpp index 1a316d9..9fc95d0 100644 --- a/src/delta_extension.cpp +++ b/src/delta_extension.cpp @@ -1,8 +1,8 @@ #define DUCKDB_EXTENSION_MAIN #include "delta_extension.hpp" -#include "delta_functions.hpp" +#include "delta_functions.hpp" #include "duckdb.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/main/extension_util.hpp" @@ -10,18 +10,18 @@ namespace duckdb { static void LoadInternal(DatabaseInstance &instance) { - // Load functions - for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { - ExtensionUtil::RegisterFunction(instance, function); - } + // Load functions + for (const auto &function : DeltaFunctions::GetTableFunctions(instance)) { + ExtensionUtil::RegisterFunction(instance, function); + } } void DeltaExtension::Load(DuckDB &db) { - LoadInternal(*db.instance); + LoadInternal(*db.instance); } std::string DeltaExtension::Name() { - return "delta"; + return "delta"; } } // namespace duckdb @@ -29,12 +29,12 @@ std::string DeltaExtension::Name() { extern "C" { DUCKDB_EXTENSION_API void delta_init(duckdb::DatabaseInstance &db) { - duckdb::DuckDB db_wrapper(db); - db_wrapper.LoadExtension(); + duckdb::DuckDB db_wrapper(db); + db_wrapper.LoadExtension(); } DUCKDB_EXTENSION_API const char *delta_version() { - return duckdb::DuckDB::LibraryVersion(); + return duckdb::DuckDB::LibraryVersion(); } } diff --git a/src/delta_functions.cpp b/src/delta_functions.cpp index f10602a..e79894b 100644 --- a/src/delta_functions.cpp +++ b/src/delta_functions.cpp @@ -2,16 +2,17 @@ #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" + #include namespace duckdb { vector DeltaFunctions::GetTableFunctions(DatabaseInstance &instance) { - vector functions; + vector functions; - functions.push_back(GetDeltaScanFunction(instance)); + functions.push_back(GetDeltaScanFunction(instance)); - return functions; + return functions; } -}; +}; // namespace duckdb diff --git a/src/delta_utils.cpp b/src/delta_utils.cpp index 32db255..035d300 100644 --- a/src/delta_utils.cpp +++ b/src/delta_utils.cpp @@ -2,312 +2,323 @@ #include "duckdb.hpp" #include "duckdb/main/extension_util.hpp" + #include #include namespace duckdb { -unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot* snapshot) { - SchemaVisitor state; - ffi::EngineSchemaVisitor visitor; - - visitor.data = &state; - visitor.make_field_list = (uintptr_t (*)(void*, uintptr_t)) &MakeFieldList; - visitor.visit_struct = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uintptr_t)) &VisitStruct; - visitor.visit_array = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitArray; - visitor.visit_map = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t)) &VisitMap; - visitor.visit_decimal = (void (*)(void*, uintptr_t, ffi::KernelStringSlice, uint8_t , uint8_t)) &VisitDecimal; - visitor.visit_string = VisitSimpleType(); - visitor.visit_long = VisitSimpleType(); - visitor.visit_integer = VisitSimpleType(); - visitor.visit_short = VisitSimpleType(); - visitor.visit_byte = VisitSimpleType(); - visitor.visit_float = VisitSimpleType(); - visitor.visit_double = VisitSimpleType(); - visitor.visit_boolean = VisitSimpleType(); - visitor.visit_binary = VisitSimpleType(); - visitor.visit_date = VisitSimpleType(); - visitor.visit_timestamp = VisitSimpleType(); - visitor.visit_timestamp_ntz = VisitSimpleType(); - - uintptr_t result = visit_schema(snapshot, &visitor); - return state.TakeFieldList(result); +unique_ptr SchemaVisitor::VisitSnapshotSchema(ffi::SharedSnapshot *snapshot) { + SchemaVisitor state; + ffi::EngineSchemaVisitor visitor; + + visitor.data = &state; + visitor.make_field_list = (uintptr_t (*)(void *, uintptr_t))&MakeFieldList; + visitor.visit_struct = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uintptr_t))&VisitStruct; + visitor.visit_array = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t))&VisitArray; + visitor.visit_map = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, bool, uintptr_t))&VisitMap; + visitor.visit_decimal = (void (*)(void *, uintptr_t, ffi::KernelStringSlice, uint8_t, uint8_t))&VisitDecimal; + visitor.visit_string = VisitSimpleType(); + visitor.visit_long = VisitSimpleType(); + visitor.visit_integer = VisitSimpleType(); + visitor.visit_short = VisitSimpleType(); + visitor.visit_byte = VisitSimpleType(); + visitor.visit_float = VisitSimpleType(); + visitor.visit_double = VisitSimpleType(); + visitor.visit_boolean = VisitSimpleType(); + visitor.visit_binary = VisitSimpleType(); + visitor.visit_date = VisitSimpleType(); + visitor.visit_timestamp = VisitSimpleType(); + visitor.visit_timestamp_ntz = VisitSimpleType(); + + uintptr_t result = visit_schema(snapshot, &visitor); + return state.TakeFieldList(result); } -void SchemaVisitor::VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale) { - state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); +void SchemaVisitor::VisitDecimal(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + uint8_t precision, uint8_t scale) { + state->AppendToList(sibling_list_id, name, LogicalType::DECIMAL(precision, scale)); } -uintptr_t SchemaVisitor::MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint) { - return state->MakeFieldListImpl(capacity_hint); +uintptr_t SchemaVisitor::MakeFieldList(SchemaVisitor *state, uintptr_t capacity_hint) { + return state->MakeFieldListImpl(capacity_hint); } -void SchemaVisitor::VisitStruct(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); - state->AppendToList(sibling_list_id, name, LogicalType::STRUCT(std::move(*children))); +void SchemaVisitor::VisitStruct(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); + state->AppendToList(sibling_list_id, name, LogicalType::STRUCT(std::move(*children))); } -void SchemaVisitor::VisitArray(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); +void SchemaVisitor::VisitArray(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + bool contains_null, uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); - D_ASSERT(children->size() == 1); - state->AppendToList(sibling_list_id, name, LogicalType::LIST(children->front().second)); + D_ASSERT(children->size() == 1); + state->AppendToList(sibling_list_id, name, LogicalType::LIST(children->front().second)); } -void SchemaVisitor::VisitMap(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id) { - auto children = state->TakeFieldList(child_list_id); +void SchemaVisitor::VisitMap(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + bool contains_null, uintptr_t child_list_id) { + auto children = state->TakeFieldList(child_list_id); - D_ASSERT(children->size() == 2); - state->AppendToList(sibling_list_id, name, LogicalType::MAP(LogicalType::STRUCT(std::move(*children)))); + D_ASSERT(children->size() == 2); + state->AppendToList(sibling_list_id, name, LogicalType::MAP(LogicalType::STRUCT(std::move(*children)))); } uintptr_t SchemaVisitor::MakeFieldListImpl(uintptr_t capacity_hint) { - uintptr_t id = next_id++; - auto list = make_uniq(); - if (capacity_hint > 0) { - list->reserve(capacity_hint); - } - inflight_lists.emplace(id, std::move(list)); - return id; + uintptr_t id = next_id++; + auto list = make_uniq(); + if (capacity_hint > 0) { + list->reserve(capacity_hint); + } + inflight_lists.emplace(id, std::move(list)); + return id; } -void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType&& child) { - auto it = inflight_lists.find(id); - if (it == inflight_lists.end()) { - // TODO... some error... - throw InternalException("WEIRD SHIT"); - } else { - it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); - } +void SchemaVisitor::AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType &&child) { + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + // TODO... some error... + throw InternalException("WEIRD SHIT"); + } else { + it->second->emplace_back(std::make_pair(string(name.ptr, name.len), std::move(child))); + } } unique_ptr SchemaVisitor::TakeFieldList(uintptr_t id) { - auto it = inflight_lists.find(id); - if (it == inflight_lists.end()) { - // TODO: Raise some kind of error. - throw InternalException("WEIRD SHIT 2"); - } - auto rval = std::move(it->second); - inflight_lists.erase(it); - return rval; + auto it = inflight_lists.find(id); + if (it == inflight_lists.end()) { + // TODO: Raise some kind of error. + throw InternalException("WEIRD SHIT 2"); + } + auto rval = std::move(it->second); + inflight_lists.erase(it); + return rval; } - -ffi::EngineError* DuckDBEngineError::AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg) { - auto error = new DuckDBEngineError; - error->etype = etype; - error->error_message = string(msg.ptr, msg.len); - return error; +ffi::EngineError *DuckDBEngineError::AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg) { + auto error = new DuckDBEngineError; + error->etype = etype; + error->error_message = string(msg.ptr, msg.len); + return error; } string DuckDBEngineError::KernelErrorEnumToString(ffi::KernelError err) { - const char* KERNEL_ERROR_ENUM_STRINGS[] = { - "UnknownError", - "FFIError", - "ArrowError", - "EngineDataTypeError", - "ExtractError", - "GenericError", - "IOErrorError", - "ParquetError", - "ObjectStoreError", - "ObjectStorePathError", - "Reqwest", - "FileNotFoundError", - "MissingColumnError", - "UnexpectedColumnTypeError", - "MissingDataError", - "MissingVersionError", - "DeletionVectorError", - "InvalidUrlError", - "MalformedJsonError", - "MissingMetadataError", - "MissingProtocolError", - "MissingMetadataAndProtocolError", - "ParseError", - "JoinFailureError", - "Utf8Error", - "ParseIntError", - "InvalidColumnMappingMode", - "InvalidTableLocation", - "InvalidDecimalError", - }; - - static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS)/sizeof(char*)-1 == (int)ffi::KernelError::InvalidDecimalError, - "KernelErrorEnumStrings mismatched with kernel"); - - if ((int)err < sizeof(KERNEL_ERROR_ENUM_STRINGS)/sizeof(char*)) { - return KERNEL_ERROR_ENUM_STRINGS[(int)err]; - } - - return StringUtil::Format("EnumOutOfRange (enum val out of range: %d)", (int)err); + const char *KERNEL_ERROR_ENUM_STRINGS[] = { + "UnknownError", + "FFIError", + "ArrowError", + "EngineDataTypeError", + "ExtractError", + "GenericError", + "IOErrorError", + "ParquetError", + "ObjectStoreError", + "ObjectStorePathError", + "Reqwest", + "FileNotFoundError", + "MissingColumnError", + "UnexpectedColumnTypeError", + "MissingDataError", + "MissingVersionError", + "DeletionVectorError", + "InvalidUrlError", + "MalformedJsonError", + "MissingMetadataError", + "MissingProtocolError", + "MissingMetadataAndProtocolError", + "ParseError", + "JoinFailureError", + "Utf8Error", + "ParseIntError", + "InvalidColumnMappingMode", + "InvalidTableLocation", + "InvalidDecimalError", + }; + + static_assert(sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *) - 1 == (int)ffi::KernelError::InvalidDecimalError, + "KernelErrorEnumStrings mismatched with kernel"); + + if ((int)err < sizeof(KERNEL_ERROR_ENUM_STRINGS) / sizeof(char *)) { + return KERNEL_ERROR_ENUM_STRINGS[(int)err]; + } + + return StringUtil::Format("EnumOutOfRange (enum val out of range: %d)", (int)err); } void DuckDBEngineError::Throw(string from_where) { - // Make copies before calling delete this - auto etype_copy = etype; - auto message_copy = error_message; - - // Consume error by calling delete this (remember this error is created by kernel using AllocateError) - delete this; - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error: %u (%s) with message (%s)", - from_where.c_str(), etype_copy, KernelErrorEnumToString(etype_copy), message_copy); + // Make copies before calling delete this + auto etype_copy = etype; + auto message_copy = error_message; + + // Consume error by calling delete this (remember this error is created by + // kernel using AllocateError) + delete this; + throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error: %u (%s) " + "with message (%s)", + from_where.c_str(), etype_copy, KernelErrorEnumToString(etype_copy), message_copy); } - - ffi::KernelStringSlice KernelUtils::ToDeltaString(const string &str) { - return {str.data(), str.size()}; + return {str.data(), str.size()}; } string KernelUtils::FromDeltaString(const struct ffi::KernelStringSlice slice) { - return {slice.ptr, slice.len}; + return {slice.ptr, slice.len}; } vector KernelUtils::FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice) { - vector result; - result.assign(slice.ptr, slice.ptr + slice.len); - return result; + vector result; + result.assign(slice.ptr, slice.ptr + slice.len); + return result; } PredicateVisitor::PredicateVisitor(const vector &column_names, optional_ptr filters) { - predicate = this; - visitor = (uintptr_t (*)(void*, ffi::KernelExpressionVisitorState*)) &VisitPredicate; - - if (filters) { - for (auto& filter : filters->filters) { - column_filters[column_names[filter.first]] = filter.second.get(); - } - } + predicate = this; + visitor = (uintptr_t (*)(void *, ffi::KernelExpressionVisitorState *))&VisitPredicate; + + if (filters) { + for (auto &filter : filters->filters) { + column_filters[column_names[filter.first]] = filter.second.get(); + } + } } -// Template wrapper function that implements get_next for EngineIteratorFromCallable. +// Template wrapper function that implements get_next for +// EngineIteratorFromCallable. template -static auto GetNextFromCallable(Callable* callable) -> decltype(std::declval()()) { - return callable->operator()(); +static auto GetNextFromCallable(Callable *callable) -> decltype(std::declval()()) { + return callable->operator()(); } // Wraps a callable object (e.g. C++11 lambda) as an EngineIterator. template -ffi::EngineIterator EngineIteratorFromCallable(Callable& callable) { - auto* get_next = &GetNextFromCallable; - return {&callable, (const void *(*)(void*)) get_next}; +ffi::EngineIterator EngineIteratorFromCallable(Callable &callable) { + auto *get_next = &GetNextFromCallable; + return {&callable, (const void *(*)(void *))get_next}; }; -uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state) { - auto &filters = predicate->column_filters; - - auto it = filters.begin(); - auto end = filters.end(); - auto get_next = [predicate, state, &it, &end]() -> uintptr_t { - if (it == end) { - return 0; - } - auto &filter = *it++; - return predicate->VisitFilter(filter.first, *filter.second, state); - }; - auto eit = EngineIteratorFromCallable(get_next); - - return visit_expression_and(state, &eit); +uintptr_t PredicateVisitor::VisitPredicate(PredicateVisitor *predicate, ffi::KernelExpressionVisitorState *state) { + auto &filters = predicate->column_filters; + + auto it = filters.begin(); + auto end = filters.end(); + auto get_next = [predicate, state, &it, &end]() -> uintptr_t { + if (it == end) { + return 0; + } + auto &filter = *it++; + return predicate->VisitFilter(filter.first, *filter.second, state); + }; + auto eit = EngineIteratorFromCallable(get_next); + + return visit_expression_and(state, &eit); } -uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state) { - auto maybe_left = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); - uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitConstantFilter failed to visit_expression_column"); - - uintptr_t right = ~0; - auto &value = filter.constant; - switch (value.type().id()) { - case LogicalType::BIGINT: - right = visit_expression_literal_long(state, BigIntValue::Get(value)); - break; - case LogicalType::INTEGER: - right = visit_expression_literal_int(state, IntegerValue::Get(value)); - break; - case LogicalType::SMALLINT: - right = visit_expression_literal_short(state, SmallIntValue::Get(value)); - break; - case LogicalType::TINYINT: - right = visit_expression_literal_byte(state, TinyIntValue::Get(value)); - break; - case LogicalType::FLOAT: - right = visit_expression_literal_float(state, FloatValue::Get(value)); - break; - case LogicalType::DOUBLE: - right = visit_expression_literal_double(state, DoubleValue::Get(value)); - break; - case LogicalType::BOOLEAN: - right = visit_expression_literal_bool(state, BooleanValue::Get(value)); - break; - case LogicalType::VARCHAR: { - // WARNING: C++ lifetime extension rules don't protect calls of the form foo(std::string(...).c_str()) - auto str = StringValue::Get(value); - auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), DuckDBEngineError::AllocateError); - right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); - break; - } - default: - break; // unsupported type - } - - // TODO support other comparison types? - switch (filter.comparison_type) { - case ExpressionType::COMPARE_LESSTHAN: - return visit_expression_lt(state, left, right); - case ExpressionType::COMPARE_LESSTHANOREQUALTO: - return visit_expression_le(state, left, right); - case ExpressionType::COMPARE_GREATERTHAN: - return visit_expression_gt(state, left, right); - case ExpressionType::COMPARE_GREATERTHANOREQUALTO: - return visit_expression_ge(state, left, right); - case ExpressionType::COMPARE_EQUAL: - return visit_expression_eq(state, left, right); - - default: - std::cout << " Unsupported operation: " << (int) filter.comparison_type << std::endl; - return ~0; // Unsupported operation - } +uintptr_t PredicateVisitor::VisitConstantFilter(const string &col_name, const ConstantFilter &filter, + ffi::KernelExpressionVisitorState *state) { + auto maybe_left = + ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + uintptr_t left = KernelUtils::UnpackResult(maybe_left, "VisitConstantFilter failed to visit_expression_column"); + + uintptr_t right = ~0; + auto &value = filter.constant; + switch (value.type().id()) { + case LogicalType::BIGINT: + right = visit_expression_literal_long(state, BigIntValue::Get(value)); + break; + case LogicalType::INTEGER: + right = visit_expression_literal_int(state, IntegerValue::Get(value)); + break; + case LogicalType::SMALLINT: + right = visit_expression_literal_short(state, SmallIntValue::Get(value)); + break; + case LogicalType::TINYINT: + right = visit_expression_literal_byte(state, TinyIntValue::Get(value)); + break; + case LogicalType::FLOAT: + right = visit_expression_literal_float(state, FloatValue::Get(value)); + break; + case LogicalType::DOUBLE: + right = visit_expression_literal_double(state, DoubleValue::Get(value)); + break; + case LogicalType::BOOLEAN: + right = visit_expression_literal_bool(state, BooleanValue::Get(value)); + break; + case LogicalType::VARCHAR: { + // WARNING: C++ lifetime extension rules don't protect calls of the form + // foo(std::string(...).c_str()) + auto str = StringValue::Get(value); + auto maybe_right = ffi::visit_expression_literal_string(state, KernelUtils::ToDeltaString(str), + DuckDBEngineError::AllocateError); + right = KernelUtils::UnpackResult(maybe_right, "VisitConstantFilter failed to visit_expression_literal_string"); + break; + } + default: + break; // unsupported type + } + + // TODO support other comparison types? + switch (filter.comparison_type) { + case ExpressionType::COMPARE_LESSTHAN: + return visit_expression_lt(state, left, right); + case ExpressionType::COMPARE_LESSTHANOREQUALTO: + return visit_expression_le(state, left, right); + case ExpressionType::COMPARE_GREATERTHAN: + return visit_expression_gt(state, left, right); + case ExpressionType::COMPARE_GREATERTHANOREQUALTO: + return visit_expression_ge(state, left, right); + case ExpressionType::COMPARE_EQUAL: + return visit_expression_eq(state, left, right); + + default: + std::cout << " Unsupported operation: " << (int)filter.comparison_type << std::endl; + return ~0; // Unsupported operation + } } - -uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state) { - auto it = filter.child_filters.begin(); - auto end = filter.child_filters.end(); - auto get_next = [this, col_name, state, &it, &end]() -> uintptr_t { - if (it == end) { - return 0; - } - auto &child_filter = *it++; - - return VisitFilter(col_name, *child_filter, state); - }; - auto eit = EngineIteratorFromCallable(get_next); - return visit_expression_and(state, &eit); +uintptr_t PredicateVisitor::VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, + ffi::KernelExpressionVisitorState *state) { + auto it = filter.child_filters.begin(); + auto end = filter.child_filters.end(); + auto get_next = [this, col_name, state, &it, &end]() -> uintptr_t { + if (it == end) { + return 0; + } + auto &child_filter = *it++; + + return VisitFilter(col_name, *child_filter, state); + }; + auto eit = EngineIteratorFromCallable(get_next); + return visit_expression_and(state, &eit); } uintptr_t PredicateVisitor::VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { - auto maybe_inner = ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); - uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column"); - return ffi::visit_expression_is_null(state, inner); + auto maybe_inner = + ffi::visit_expression_column(state, KernelUtils::ToDeltaString(col_name), DuckDBEngineError::AllocateError); + uintptr_t inner = KernelUtils::UnpackResult(maybe_inner, "VisitIsNull failed to visit_expression_column"); + return ffi::visit_expression_is_null(state, inner); } uintptr_t PredicateVisitor::VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state) { - return ffi::visit_expression_not(state, VisitIsNull(col_name, state)); + return ffi::visit_expression_not(state, VisitIsNull(col_name, state)); } -uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state) { - switch (filter.filter_type) { - case TableFilterType::CONSTANT_COMPARISON: - return VisitConstantFilter(col_name, static_cast(filter), state); - case TableFilterType::CONJUNCTION_AND: - return VisitAndFilter(col_name, static_cast(filter), state); - case TableFilterType::IS_NULL: - return VisitIsNull(col_name, state); - case TableFilterType::IS_NOT_NULL: - return VisitIsNotNull(col_name, state); - default: - return ~0; - } +uintptr_t PredicateVisitor::VisitFilter(const string &col_name, const TableFilter &filter, + ffi::KernelExpressionVisitorState *state) { + switch (filter.filter_type) { + case TableFilterType::CONSTANT_COMPARISON: + return VisitConstantFilter(col_name, static_cast(filter), state); + case TableFilterType::CONJUNCTION_AND: + return VisitAndFilter(col_name, static_cast(filter), state); + case TableFilterType::IS_NULL: + return VisitIsNull(col_name, state); + case TableFilterType::IS_NOT_NULL: + return VisitIsNotNull(col_name, state); + default: + return ~0; + } } -}; +}; // namespace duckdb diff --git a/src/functions/delta_scan.cpp b/src/functions/delta_scan.cpp index 65eb34f..3bf4105 100644 --- a/src/functions/delta_scan.cpp +++ b/src/functions/delta_scan.cpp @@ -1,745 +1,774 @@ -#include "duckdb/function/table_function.hpp" +#include "functions/delta_scan.hpp" #include "delta_functions.hpp" -#include "functions/delta_scan.hpp" -#include "duckdb/optimizer/filter_combiner.hpp" -#include "duckdb/planner/operator/logical_get.hpp" -#include "duckdb/main/extension_util.hpp" #include "duckdb/catalog/catalog_entry/table_function_catalog_entry.hpp" #include "duckdb/common/local_file_system.hpp" #include "duckdb/common/types/data_chunk.hpp" +#include "duckdb/execution/expression_executor.hpp" +#include "duckdb/function/table_function.hpp" +#include "duckdb/main/extension_util.hpp" +#include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/optimizer/filter_combiner.hpp" #include "duckdb/parser/expression/constant_expression.hpp" #include "duckdb/parser/expression/function_expression.hpp" #include "duckdb/parser/parsed_expression.hpp" -#include "duckdb/execution/expression_executor.hpp" #include "duckdb/planner/binder.hpp" -#include "duckdb/main/secret/secret_manager.hpp" - +#include "duckdb/planner/operator/logical_get.hpp" -#include +#include #include #include -#include +#include namespace duckdb { -static void* allocate_string(const struct ffi::KernelStringSlice slice) { - return new string(slice.ptr, slice.len); +static void *allocate_string(const struct ffi::KernelStringSlice slice) { + return new string(slice.ptr, slice.len); } string url_decode(string input) { - string result; - result.reserve(input.size()); - char ch; - replace(input.begin(), input.end(), '+', ' '); - for (idx_t i = 0; i < input.length(); i++) { - if (int(input[i]) == 37) { - unsigned int ii; - sscanf(input.substr(i + 1, 2).c_str(), "%x", &ii); - ch = static_cast(ii); - result += ch; - i += 2; - } else { - result += input[i]; - } - } - return result; + string result; + result.reserve(input.size()); + char ch; + replace(input.begin(), input.end(), '+', ' '); + for (idx_t i = 0; i < input.length(); i++) { + if (int(input[i]) == 37) { + unsigned int ii; + sscanf(input.substr(i + 1, 2).c_str(), "%x", &ii); + ch = static_cast(ii); + result += ch; + i += 2; + } else { + result += input[i]; + } + } + return result; } -static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, const ffi::Stats * stats, const ffi::DvInfo *dv_info, const struct ffi::CStringMap *partition_values) { - auto context = (DeltaSnapshot *) engine_context; - auto path_string = context->GetPath(); - StringUtil::RTrim(path_string, "/"); - path_string += "/" + KernelUtils::FromDeltaString(path); - - path_string = url_decode(path_string); - - // First we append the file to our resolved files - context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string)); - context->metadata.emplace_back(make_uniq()); - - D_ASSERT(context->resolved_files.size() == context->metadata.size()); - - // Initialize the file metadata - context->metadata.back()->delta_snapshot_version = context->version; - context->metadata.back()->file_number = context->resolved_files.size() - 1; - if (stats) { - context->metadata.back()->cardinality = stats->num_records; - } - - // Fetch the deletion vector - auto selection_vector_res = ffi::selection_vector_from_dv(dv_info, context->extern_engine.get(), context->global_state.get()); - auto selection_vector = KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath()); - if (selection_vector.ptr) { - context->metadata.back()->selection_vector = selection_vector; - } - - // Lookup all columns for potential hits in the constant map - case_insensitive_map_t constant_map; - for (const auto &col: context->names) { - auto key = KernelUtils::ToDeltaString(col); - auto *partition_val = (string *) ffi::get_from_map(partition_values, key, allocate_string); - if (partition_val) { - constant_map[col] = *partition_val; - delete partition_val; - } - } - context->metadata.back()->partition_map = std::move(constant_map); +static void visit_callback(ffi::NullableCvoid engine_context, struct ffi::KernelStringSlice path, int64_t size, + const ffi::Stats *stats, const ffi::DvInfo *dv_info, + const struct ffi::CStringMap *partition_values) { + auto context = (DeltaSnapshot *)engine_context; + auto path_string = context->GetPath(); + StringUtil::RTrim(path_string, "/"); + path_string += "/" + KernelUtils::FromDeltaString(path); + + path_string = url_decode(path_string); + + // First we append the file to our resolved files + context->resolved_files.push_back(DeltaSnapshot::ToDuckDBPath(path_string)); + context->metadata.emplace_back(make_uniq()); + + D_ASSERT(context->resolved_files.size() == context->metadata.size()); + + // Initialize the file metadata + context->metadata.back()->delta_snapshot_version = context->version; + context->metadata.back()->file_number = context->resolved_files.size() - 1; + if (stats) { + context->metadata.back()->cardinality = stats->num_records; + } + + // Fetch the deletion vector + auto selection_vector_res = + ffi::selection_vector_from_dv(dv_info, context->extern_engine.get(), context->global_state.get()); + auto selection_vector = + KernelUtils::UnpackResult(selection_vector_res, "selection_vector_from_dv for path " + context->GetPath()); + if (selection_vector.ptr) { + context->metadata.back()->selection_vector = selection_vector; + } + + // Lookup all columns for potential hits in the constant map + case_insensitive_map_t constant_map; + for (const auto &col : context->names) { + auto key = KernelUtils::ToDeltaString(col); + auto *partition_val = (string *)ffi::get_from_map(partition_values, key, allocate_string); + if (partition_val) { + constant_map[col] = *partition_val; + delete partition_val; + } + } + context->metadata.back()->partition_map = std::move(constant_map); } - static void visit_data(void *engine_context, ffi::ExclusiveEngineData* engine_data, const struct ffi::KernelBoolSlice selection_vec) { - ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); +static void visit_data(void *engine_context, ffi::ExclusiveEngineData *engine_data, + const struct ffi::KernelBoolSlice selection_vec) { + ffi::visit_scan_data(engine_data, selection_vec, engine_context, visit_callback); } -string ParseAccountNameFromEndpoint(const string& endpoint) { - if (!StringUtil::StartsWith(endpoint, "https://")) { - return ""; - } - auto result = endpoint.find('.', 8); - if (result == endpoint.npos) { - return ""; - } - return endpoint.substr(8,result-8); +string ParseAccountNameFromEndpoint(const string &endpoint) { + if (!StringUtil::StartsWith(endpoint, "https://")) { + return ""; + } + auto result = endpoint.find('.', 8); + if (result == endpoint.npos) { + return ""; + } + return endpoint.substr(8, result - 8); } -string parseFromConnectionString(const string& connectionString, const string& key) { - std::regex pattern(key + "=([^;]+)(?=;|$)"); - std::smatch matches; - if (std::regex_search(connectionString, matches, pattern) && matches.size() > 1) { - // The second match ([1]) contains the access key - return matches[1].str(); - } - return ""; +string parseFromConnectionString(const string &connectionString, const string &key) { + std::regex pattern(key + "=([^;]+)(?=;|$)"); + std::smatch matches; + if (std::regex_search(connectionString, matches, pattern) && matches.size() > 1) { + // The second match ([1]) contains the access key + return matches[1].str(); + } + return ""; } -static ffi::EngineBuilder* CreateBuilder(ClientContext &context, const string &path) { - ffi::EngineBuilder* builder; - - // For "regular" paths we early out with the default builder config - if (!StringUtil::StartsWith(path, "s3://") && - !StringUtil::StartsWith(path, "gcs://") && - !StringUtil::StartsWith(path, "gs://") && - !StringUtil::StartsWith(path, "r2://") && - !StringUtil::StartsWith(path, "azure://") && - !StringUtil::StartsWith(path, "az://") && - !StringUtil::StartsWith(path, "abfs://") && - !StringUtil::StartsWith(path, "abfss://")) { - auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); - return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); - } - - string bucket; - string path_in_bucket; - string secret_type; - - if (StringUtil::StartsWith(path, "s3://")) { - auto end_of_container = path.find('/',5); - - if(end_of_container == string::npos) { - throw IOException("Invalid s3 url passed to delta scan: %s", path); - } - bucket = path.substr(5, end_of_container-5); - path_in_bucket = path.substr(end_of_container); - secret_type = "s3"; - } else if (StringUtil::StartsWith(path, "gcs://")) { - auto end_of_container = path.find('/',6); - - if(end_of_container == string::npos) { - throw IOException("Invalid gcs url passed to delta scan: %s", path); - } - bucket = path.substr(6, end_of_container-6); - path_in_bucket = path.substr(end_of_container); - secret_type = "gcs"; - } else if (StringUtil::StartsWith(path, "gs://")) { - auto end_of_container = path.find('/',5); - - if(end_of_container == string::npos) { - throw IOException("Invalid gcs url passed to delta scan: %s", path); - } - bucket = path.substr(5, end_of_container-5); - path_in_bucket = path.substr(end_of_container); - secret_type = "gcs"; - } else if (StringUtil::StartsWith(path, "r2://")) { - auto end_of_container = path.find('/',5); - - if(end_of_container == string::npos) { - throw IOException("Invalid gcs url passed to delta scan: %s", path); - } - bucket = path.substr(5, end_of_container-5); - path_in_bucket = path.substr(end_of_container); - secret_type = "r2"; - } else if ((StringUtil::StartsWith(path, "azure://")) || (StringUtil::StartsWith(path, "abfss://"))) { - auto end_of_container = path.find('/',8); - - if(end_of_container == string::npos) { - throw IOException("Invalid azure url passed to delta scan: %s", path); - } - bucket = path.substr(8, end_of_container-8); - path_in_bucket = path.substr(end_of_container); - secret_type = "azure"; - } else if (StringUtil::StartsWith(path, "az://")) { - auto end_of_container = path.find('/',5); - - if(end_of_container == string::npos) { - throw IOException("Invalid azure url passed to delta scan: %s", path); - } - bucket = path.substr(5, end_of_container-5); - path_in_bucket = path.substr(end_of_container); - secret_type = "azure"; - } else if (StringUtil::StartsWith(path, "abfs://")) { - auto end_of_container = path.find('/',7); - - if(end_of_container == string::npos) { - throw IOException("Invalid azure url passed to delta scan: %s", path); - } - bucket = path.substr(8, end_of_container-8); - path_in_bucket = path.substr(end_of_container); - secret_type = "azure"; - } - - // We need to substitute DuckDB's usage of s3 and r2 paths because delta kernel needs to just interpret them as s3 protocol servers. - string cleaned_path; - if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") ) { - cleaned_path = "s3://" + path.substr(5); - } else if (StringUtil::StartsWith(path, "gcs://")) { - cleaned_path = "s3://" + path.substr(6); - } else { - cleaned_path = path; - } - - auto interface_builder_res = ffi::get_engine_builder(KernelUtils::ToDeltaString(cleaned_path), DuckDBEngineError::AllocateError); - builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + cleaned_path); - - // For S3 or Azure paths we need to trim the url, set the container, and fetch a potential secret - auto &secret_manager = SecretManager::Get(context); - auto transaction = CatalogTransaction::GetSystemCatalogTransaction(context); - - auto secret_match = secret_manager.LookupSecret(transaction, path, secret_type); - - // No secret: nothing left to do here! - if (!secret_match.HasMatch()) { - if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") || StringUtil::StartsWith(path, "gcs://")) { - throw NotImplementedException("Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please create an R2 or GCS secret containing the credentials for this endpoint and try again."); - } - - return builder; - } - const auto &kv_secret = dynamic_cast(*secret_match.secret_entry->secret); - - KeyValueSecretReader secret_reader(kv_secret, *context.client_data->file_opener); - - // Here you would need to add the logic for setting the builder options for Azure - // This is just a placeholder and will need to be replaced with the actual logic - if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") { - - string key_id, secret, session_token, region, endpoint, url_style; - bool use_ssl = true; - secret_reader.TryGetSecretKey("key_id", key_id); - secret_reader.TryGetSecretKey("secret", secret); - secret_reader.TryGetSecretKey("session_token", session_token); - secret_reader.TryGetSecretKey("region", region); - secret_reader.TryGetSecretKey("endpoint", endpoint); - secret_reader.TryGetSecretKey("url_style", url_style); - secret_reader.TryGetSecretKey("use_ssl", use_ssl); - - if (key_id.empty() && secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), KernelUtils::ToDeltaString("true")); - } - - if (!key_id.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), KernelUtils::ToDeltaString(key_id)); - } - if (!secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), KernelUtils::ToDeltaString(secret)); - } - if (!session_token.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), KernelUtils::ToDeltaString(session_token)); - } - if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { - if(!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { - if(use_ssl) { - endpoint = "https://" + endpoint; - } else { - endpoint = "http://" + endpoint; - } - } - - if (StringUtil::StartsWith(endpoint, "http://")) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), KernelUtils::ToDeltaString("true")); - } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), KernelUtils::ToDeltaString(endpoint)); - } - - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region)); - - } else if (secret_type == "azure") { - // azure seems to be super complicated as we need to cover duckdb azure plugin and delta RS builder - // and both require different settings - string connection_string, account_name, endpoint, client_id, client_secret, tenant_id, chain; - secret_reader.TryGetSecretKey("connection_string", connection_string); - secret_reader.TryGetSecretKey("account_name", account_name); - secret_reader.TryGetSecretKey("endpoint", endpoint); - secret_reader.TryGetSecretKey("client_id", client_id); - secret_reader.TryGetSecretKey("client_secret", client_secret); - secret_reader.TryGetSecretKey("tenant_id", tenant_id); - secret_reader.TryGetSecretKey("chain", chain); - - if (!account_name.empty() && account_name == "onelake") { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_fabric_endpoint"), KernelUtils::ToDeltaString("true")); - } - - auto provider = kv_secret.GetProvider(); - if (provider == "access_token") { - // Authentication option 0: https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variant.Token - string access_token; - secret_reader.TryGetSecretKey("access_token", access_token); - if (access_token.empty()) { - throw InvalidInputException("No access_token value not found in secret provider!"); - } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("bearer_token"), KernelUtils::ToDeltaString(access_token)); - } else if (provider == "credential_chain") { - // Authentication option 1a: using the cli authentication - if (chain.find("cli") != std::string::npos) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_azure_cli"), KernelUtils::ToDeltaString("true")); - } - // Authentication option 1b: non-cli credential chains will just "hope for the best" technically since we are using the default - // credential chain provider duckDB and delta-kernel-rs should find the same auth - } else if (!connection_string.empty() && connection_string != "NULL") { - - // Authentication option 2: a connection string based on account key - auto account_key = parseFromConnectionString(connection_string, "AccountKey"); - account_name = parseFromConnectionString(connection_string, "AccountName"); - // Authentication option 2: a connection string based on account key - if (!account_name.empty() && !account_key.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_key"), - KernelUtils::ToDeltaString(account_key)); - } else { - // Authentication option 2b: a connection string based on SAS token - endpoint = parseFromConnectionString(connection_string, "BlobEndpoint"); - if (account_name.empty()) { - account_name = ParseAccountNameFromEndpoint(endpoint); - } - auto sas_token = parseFromConnectionString(connection_string, "SharedAccessSignature"); - if (!sas_token.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("sas_token"), - KernelUtils::ToDeltaString(sas_token)); - } - } - } else if (provider == "service_principal") { - if (!client_id.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_id"), KernelUtils::ToDeltaString(client_id)); - } - if (!client_secret.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_secret"), KernelUtils::ToDeltaString(client_secret)); - } - if (!tenant_id.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_tenant_id"), KernelUtils::ToDeltaString(tenant_id)); - } - } else { - // Authentication option 3: no authentication, just an account name - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_skip_signature"), KernelUtils::ToDeltaString("true")); - } - // Set the use_emulator option for when the azurite test server is used - if (account_name == "devstoreaccount1" || connection_string.find("devstoreaccount1") != string::npos) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_emulator"), KernelUtils::ToDeltaString("true")); - } - if (!account_name.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_name"), KernelUtils::ToDeltaString(account_name)); //needed for delta RS builder - } - if (!endpoint.empty()) { - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_endpoint"), KernelUtils::ToDeltaString(endpoint)); - } - ffi::set_builder_option(builder, KernelUtils::ToDeltaString("container_name"), KernelUtils::ToDeltaString(bucket)); - } - return builder; +static ffi::EngineBuilder *CreateBuilder(ClientContext &context, const string &path) { + ffi::EngineBuilder *builder; + + // For "regular" paths we early out with the default builder config + if (!StringUtil::StartsWith(path, "s3://") && !StringUtil::StartsWith(path, "gcs://") && + !StringUtil::StartsWith(path, "gs://") && !StringUtil::StartsWith(path, "r2://") && + !StringUtil::StartsWith(path, "azure://") && !StringUtil::StartsWith(path, "az://") && + !StringUtil::StartsWith(path, "abfs://") && !StringUtil::StartsWith(path, "abfss://")) { + auto interface_builder_res = + ffi::get_engine_builder(KernelUtils::ToDeltaString(path), DuckDBEngineError::AllocateError); + return KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + path); + } + + string bucket; + string path_in_bucket; + string secret_type; + + if (StringUtil::StartsWith(path, "s3://")) { + auto end_of_container = path.find('/', 5); + + if (end_of_container == string::npos) { + throw IOException("Invalid s3 url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container - 5); + path_in_bucket = path.substr(end_of_container); + secret_type = "s3"; + } else if (StringUtil::StartsWith(path, "gcs://")) { + auto end_of_container = path.find('/', 6); + + if (end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(6, end_of_container - 6); + path_in_bucket = path.substr(end_of_container); + secret_type = "gcs"; + } else if (StringUtil::StartsWith(path, "gs://")) { + auto end_of_container = path.find('/', 5); + + if (end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container - 5); + path_in_bucket = path.substr(end_of_container); + secret_type = "gcs"; + } else if (StringUtil::StartsWith(path, "r2://")) { + auto end_of_container = path.find('/', 5); + + if (end_of_container == string::npos) { + throw IOException("Invalid gcs url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container - 5); + path_in_bucket = path.substr(end_of_container); + secret_type = "r2"; + } else if ((StringUtil::StartsWith(path, "azure://")) || (StringUtil::StartsWith(path, "abfss://"))) { + auto end_of_container = path.find('/', 8); + + if (end_of_container == string::npos) { + throw IOException("Invalid azure url passed to delta scan: %s", path); + } + bucket = path.substr(8, end_of_container - 8); + path_in_bucket = path.substr(end_of_container); + secret_type = "azure"; + } else if (StringUtil::StartsWith(path, "az://")) { + auto end_of_container = path.find('/', 5); + + if (end_of_container == string::npos) { + throw IOException("Invalid azure url passed to delta scan: %s", path); + } + bucket = path.substr(5, end_of_container - 5); + path_in_bucket = path.substr(end_of_container); + secret_type = "azure"; + } else if (StringUtil::StartsWith(path, "abfs://")) { + auto end_of_container = path.find('/', 7); + + if (end_of_container == string::npos) { + throw IOException("Invalid azure url passed to delta scan: %s", path); + } + bucket = path.substr(8, end_of_container - 8); + path_in_bucket = path.substr(end_of_container); + secret_type = "azure"; + } + + // We need to substitute DuckDB's usage of s3 and r2 paths because delta kernel needs to just interpret them as s3 + // protocol servers. + string cleaned_path; + if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://")) { + cleaned_path = "s3://" + path.substr(5); + } else if (StringUtil::StartsWith(path, "gcs://")) { + cleaned_path = "s3://" + path.substr(6); + } else { + cleaned_path = path; + } + + auto interface_builder_res = + ffi::get_engine_builder(KernelUtils::ToDeltaString(cleaned_path), DuckDBEngineError::AllocateError); + builder = KernelUtils::UnpackResult(interface_builder_res, "get_engine_interface_builder for path " + cleaned_path); + + // For S3 or Azure paths we need to trim the url, set the container, and fetch a potential secret + auto &secret_manager = SecretManager::Get(context); + auto transaction = CatalogTransaction::GetSystemCatalogTransaction(context); + + auto secret_match = secret_manager.LookupSecret(transaction, path, secret_type); + + // No secret: nothing left to do here! + if (!secret_match.HasMatch()) { + if (StringUtil::StartsWith(path, "r2://") || StringUtil::StartsWith(path, "gs://") || + StringUtil::StartsWith(path, "gcs://")) { + throw NotImplementedException( + "Can not scan a gcs:// gs:// or r2:// url without a secret providing its endpoint currently. Please " + "create an R2 or GCS secret containing the credentials for this endpoint and try again."); + } + + return builder; + } + const auto &kv_secret = dynamic_cast(*secret_match.secret_entry->secret); + + KeyValueSecretReader secret_reader(kv_secret, *context.client_data->file_opener); + + // Here you would need to add the logic for setting the builder options for Azure + // This is just a placeholder and will need to be replaced with the actual logic + if (secret_type == "s3" || secret_type == "gcs" || secret_type == "r2") { + + string key_id, secret, session_token, region, endpoint, url_style; + bool use_ssl = true; + secret_reader.TryGetSecretKey("key_id", key_id); + secret_reader.TryGetSecretKey("secret", secret); + secret_reader.TryGetSecretKey("session_token", session_token); + secret_reader.TryGetSecretKey("region", region); + secret_reader.TryGetSecretKey("endpoint", endpoint); + secret_reader.TryGetSecretKey("url_style", url_style); + secret_reader.TryGetSecretKey("use_ssl", use_ssl); + + if (key_id.empty() && secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("skip_signature"), + KernelUtils::ToDeltaString("true")); + } + + if (!key_id.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_access_key_id"), + KernelUtils::ToDeltaString(key_id)); + } + if (!secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_secret_access_key"), + KernelUtils::ToDeltaString(secret)); + } + if (!session_token.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_session_token"), + KernelUtils::ToDeltaString(session_token)); + } + if (!endpoint.empty() && endpoint != "s3.amazonaws.com") { + if (!StringUtil::StartsWith(endpoint, "https://") && !StringUtil::StartsWith(endpoint, "http://")) { + if (use_ssl) { + endpoint = "https://" + endpoint; + } else { + endpoint = "http://" + endpoint; + } + } + + if (StringUtil::StartsWith(endpoint, "http://")) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("allow_http"), KernelUtils::ToDeltaString("true")); + } + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_endpoint"), + KernelUtils::ToDeltaString(endpoint)); + } + + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("aws_region"), KernelUtils::ToDeltaString(region)); + + } else if (secret_type == "azure") { + // azure seems to be super complicated as we need to cover duckdb azure plugin and delta RS builder + // and both require different settings + string connection_string, account_name, endpoint, client_id, client_secret, tenant_id, chain; + secret_reader.TryGetSecretKey("connection_string", connection_string); + secret_reader.TryGetSecretKey("account_name", account_name); + secret_reader.TryGetSecretKey("endpoint", endpoint); + secret_reader.TryGetSecretKey("client_id", client_id); + secret_reader.TryGetSecretKey("client_secret", client_secret); + secret_reader.TryGetSecretKey("tenant_id", tenant_id); + secret_reader.TryGetSecretKey("chain", chain); + + if (!account_name.empty() && account_name == "onelake") { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_fabric_endpoint"), + KernelUtils::ToDeltaString("true")); + } + + auto provider = kv_secret.GetProvider(); + if (provider == "access_token") { + // Authentication option 0: + // https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variant.Token + string access_token; + secret_reader.TryGetSecretKey("access_token", access_token); + if (access_token.empty()) { + throw InvalidInputException("No access_token value not found in secret provider!"); + } + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("bearer_token"), + KernelUtils::ToDeltaString(access_token)); + } else if (provider == "credential_chain") { + // Authentication option 1a: using the cli authentication + if (chain.find("cli") != std::string::npos) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_azure_cli"), + KernelUtils::ToDeltaString("true")); + } + // Authentication option 1b: non-cli credential chains will just "hope for the best" technically since we + // are using the default credential chain provider duckDB and delta-kernel-rs should find the same auth + } else if (!connection_string.empty() && connection_string != "NULL") { + + // Authentication option 2: a connection string based on account key + auto account_key = parseFromConnectionString(connection_string, "AccountKey"); + account_name = parseFromConnectionString(connection_string, "AccountName"); + // Authentication option 2: a connection string based on account key + if (!account_name.empty() && !account_key.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_key"), + KernelUtils::ToDeltaString(account_key)); + } else { + // Authentication option 2b: a connection string based on SAS token + endpoint = parseFromConnectionString(connection_string, "BlobEndpoint"); + if (account_name.empty()) { + account_name = ParseAccountNameFromEndpoint(endpoint); + } + auto sas_token = parseFromConnectionString(connection_string, "SharedAccessSignature"); + if (!sas_token.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("sas_token"), + KernelUtils::ToDeltaString(sas_token)); + } + } + } else if (provider == "service_principal") { + if (!client_id.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_id"), + KernelUtils::ToDeltaString(client_id)); + } + if (!client_secret.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_client_secret"), + KernelUtils::ToDeltaString(client_secret)); + } + if (!tenant_id.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_tenant_id"), + KernelUtils::ToDeltaString(tenant_id)); + } + } else { + // Authentication option 3: no authentication, just an account name + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_skip_signature"), + KernelUtils::ToDeltaString("true")); + } + // Set the use_emulator option for when the azurite test server is used + if (account_name == "devstoreaccount1" || connection_string.find("devstoreaccount1") != string::npos) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("use_emulator"), KernelUtils::ToDeltaString("true")); + } + if (!account_name.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("account_name"), + KernelUtils::ToDeltaString(account_name)); // needed for delta RS builder + } + if (!endpoint.empty()) { + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("azure_endpoint"), + KernelUtils::ToDeltaString(endpoint)); + } + ffi::set_builder_option(builder, KernelUtils::ToDeltaString("container_name"), KernelUtils::ToDeltaString(bucket)); + } + return builder; } -DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path) : MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) { +DeltaSnapshot::DeltaSnapshot(ClientContext &context_p, const string &path) + : MultiFileList({ToDeltaPath(path)}, FileGlobOptions::ALLOW_EMPTY), context(context_p) { } string DeltaSnapshot::GetPath() { - return GetPaths()[0]; + return GetPaths()[0]; } string DeltaSnapshot::ToDuckDBPath(const string &raw_path) { - if (StringUtil::StartsWith(raw_path, "file://")) { - return raw_path.substr(7); - } - return raw_path; + if (StringUtil::StartsWith(raw_path, "file://")) { + return raw_path.substr(7); + } + return raw_path; } string DeltaSnapshot::ToDeltaPath(const string &raw_path) { - string path; - if (StringUtil::StartsWith(raw_path, "./")) { - LocalFileSystem fs; - path = fs.JoinPath(fs.GetWorkingDirectory(), raw_path.substr(2)); - path = "file://" + path; - } else { - path = raw_path; - } - - // Paths always end in a slash (kernel likes it that way for now) - if (path[path.size()-1] != '/') { - path = path + '/'; - } - - return path; + string path; + if (StringUtil::StartsWith(raw_path, "./")) { + LocalFileSystem fs; + path = fs.JoinPath(fs.GetWorkingDirectory(), raw_path.substr(2)); + path = "file://" + path; + } else { + path = raw_path; + } + + // Paths always end in a slash (kernel likes it that way for now) + if (path[path.size() - 1] != '/') { + path = path + '/'; + } + + return path; } void DeltaSnapshot::Bind(vector &return_types, vector &names) { - if (!initialized) { - InitializeFiles(); - } - auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot.get()); - for (const auto &field: *schema) { - names.push_back(field.first); - return_types.push_back(field.second); - } - // Store the bound names for resolving the complex filter pushdown later - this->names = names; + if (!initialized) { + InitializeFiles(); + } + auto schema = SchemaVisitor::VisitSnapshotSchema(snapshot.get()); + for (const auto &field : *schema) { + names.push_back(field.first); + return_types.push_back(field.second); + } + // Store the bound names for resolving the complex filter pushdown later + this->names = names; } string DeltaSnapshot::GetFile(idx_t i) { - if (!initialized) { - InitializeFiles(); - } - // We already have this file - if (i < resolved_files.size()) { - return resolved_files[i]; - } - - if (files_exhausted) { - return ""; - } - - while(i >= resolved_files.size()) { - auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data); - - auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); - - // kernel has indicated that we have no more data to scan - if (!have_scan_data) { - files_exhausted = true; - return ""; - } - } - - // The kernel scan visitor should have resolved a file OR returned - if(i >= resolved_files.size()) { - throw IOException("Delta Kernel seems to have failed to resolve a new file"); - } - - return resolved_files[i]; + if (!initialized) { + InitializeFiles(); + } + // We already have this file + if (i < resolved_files.size()) { + return resolved_files[i]; + } + + if (files_exhausted) { + return ""; + } + + while (i >= resolved_files.size()) { + auto have_scan_data_res = ffi::kernel_scan_data_next(scan_data_iterator.get(), this, visit_data); + + auto have_scan_data = TryUnpackKernelResult(have_scan_data_res); + + // kernel has indicated that we have no more data to scan + if (!have_scan_data) { + files_exhausted = true; + return ""; + } + } + + // The kernel scan visitor should have resolved a file OR returned + if (i >= resolved_files.size()) { + throw IOException("Delta Kernel seems to have failed to resolve a new file"); + } + + return resolved_files[i]; } void DeltaSnapshot::InitializeFiles() { - auto path_slice = KernelUtils::ToDeltaString(paths[0]); + auto path_slice = KernelUtils::ToDeltaString(paths[0]); - // Register engine - auto interface_builder = CreateBuilder(context, paths[0]); - extern_engine = TryUnpackKernelResult( ffi::builder_build(interface_builder)); + // Register engine + auto interface_builder = CreateBuilder(context, paths[0]); + extern_engine = TryUnpackKernelResult(ffi::builder_build(interface_builder)); - // Initialize Snapshot - snapshot = TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())); + // Initialize Snapshot + snapshot = TryUnpackKernelResult(ffi::snapshot(path_slice, extern_engine.get())); - // Create Scan - PredicateVisitor visitor(names, &table_filters); - scan = TryUnpackKernelResult(ffi::scan(snapshot.get(), extern_engine.get(), &visitor)); + // Create Scan + PredicateVisitor visitor(names, &table_filters); + scan = TryUnpackKernelResult(ffi::scan(snapshot.get(), extern_engine.get(), &visitor)); - // Create GlobalState - global_state = ffi::get_global_scan_state(scan.get()); + // Create GlobalState + global_state = ffi::get_global_scan_state(scan.get()); - // Set version - this->version = ffi::version(snapshot.get()); + // Set version + this->version = ffi::version(snapshot.get()); - // Create scan data iterator - scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); + // Create scan data iterator + scan_data_iterator = TryUnpackKernelResult(ffi::kernel_scan_data_init(extern_engine.get(), scan.get())); - initialized = true; + initialized = true; } -unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, MultiFilePushdownInfo &info, - vector> &filters) { - FilterCombiner combiner(context); - for (const auto &filter : filters) { - combiner.AddFilter(filter->Copy()); - } - auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids); +unique_ptr DeltaSnapshot::ComplexFilterPushdown(ClientContext &context, + const MultiFileReaderOptions &options, + MultiFilePushdownInfo &info, + vector> &filters) { + FilterCombiner combiner(context); + for (const auto &filter : filters) { + combiner.AddFilter(filter->Copy()); + } + auto filterstmp = combiner.GenerateTableScanFilters(info.column_ids); - // TODO: can/should we figure out if this filtered anything? - auto filtered_list = make_uniq(context, paths[0]); - filtered_list->table_filters = std::move(filterstmp); - filtered_list->names = names; + // TODO: can/should we figure out if this filtered anything? + auto filtered_list = make_uniq(context, paths[0]); + filtered_list->table_filters = std::move(filterstmp); + filtered_list->names = names; - return std::move(filtered_list); + return std::move(filtered_list); } vector DeltaSnapshot::GetAllFiles() { - idx_t i = resolved_files.size(); - // TODO: this can probably be improved - while(!GetFile(i).empty()) { - i++; - } - return resolved_files; + idx_t i = resolved_files.size(); + // TODO: this can probably be improved + while (!GetFile(i).empty()) { + i++; + } + return resolved_files; } FileExpandResult DeltaSnapshot::GetExpandResult() { - // GetFile(1) will ensure at least the first 2 files are expanded if they are available - GetFile(1); + // GetFile(1) will ensure at least the first 2 files are expanded if they are available + GetFile(1); - if (resolved_files.size() > 1) { - return FileExpandResult::MULTIPLE_FILES; - } else if (resolved_files.size() == 1) { - return FileExpandResult::SINGLE_FILE; - } + if (resolved_files.size() > 1) { + return FileExpandResult::MULTIPLE_FILES; + } else if (resolved_files.size() == 1) { + return FileExpandResult::SINGLE_FILE; + } - return FileExpandResult::NO_FILES; + return FileExpandResult::NO_FILES; } idx_t DeltaSnapshot::GetTotalFileCount() { - // TODO: this can probably be improved - idx_t i = resolved_files.size(); - while(!GetFile(i).empty()) { - i++; - } - return resolved_files.size(); + // TODO: this can probably be improved + idx_t i = resolved_files.size(); + while (!GetFile(i).empty()) { + i++; + } + return resolved_files.size(); } unique_ptr DeltaSnapshot::GetCardinality(ClientContext &context) { - // This also ensures all files are expanded - auto total_file_count = DeltaSnapshot::GetTotalFileCount(); - - if (total_file_count == 0) { - return make_uniq(0,0); - } - - idx_t total_tuple_count = 0; - bool have_any_stats = false; - for (auto &metadatum : metadata) { - if (metadatum->cardinality != DConstants::INVALID_INDEX) { - have_any_stats = true; - total_tuple_count += metadatum->cardinality; - } - } - - if (have_any_stats) { - return make_uniq(total_tuple_count,total_tuple_count); - } - - return nullptr; + // This also ensures all files are expanded + auto total_file_count = DeltaSnapshot::GetTotalFileCount(); + + if (total_file_count == 0) { + return make_uniq(0, 0); + } + + idx_t total_tuple_count = 0; + bool have_any_stats = false; + for (auto &metadatum : metadata) { + if (metadatum->cardinality != DConstants::INVALID_INDEX) { + have_any_stats = true; + total_tuple_count += metadatum->cardinality; + } + } + + if (have_any_stats) { + return make_uniq(total_tuple_count, total_tuple_count); + } + + return nullptr; } unique_ptr DeltaMultiFileReader::CreateInstance() { - return std::move(make_uniq()); + return std::move(make_uniq()); } bool DeltaMultiFileReader::Bind(MultiFileReaderOptions &options, MultiFileList &files, - vector &return_types, vector &names, MultiFileReaderBindData &bind_data) { - auto &delta_snapshot = dynamic_cast(files); - - delta_snapshot.Bind(return_types, names); - - // We need to parse this option - bool file_row_number_enabled = options.custom_options.find("file_row_number") != options.custom_options.end(); - if (file_row_number_enabled) { - bind_data.file_row_number_idx = names.size(); - return_types.emplace_back(LogicalType::BIGINT); - names.emplace_back("file_row_number"); - } else { - // TODO: this is a bogus ID? Change for flag indicating it should be enabled? - bind_data.file_row_number_idx = names.size(); - } - - return true; + vector &return_types, vector &names, + MultiFileReaderBindData &bind_data) { + auto &delta_snapshot = dynamic_cast(files); + + delta_snapshot.Bind(return_types, names); + + // We need to parse this option + bool file_row_number_enabled = options.custom_options.find("file_row_number") != options.custom_options.end(); + if (file_row_number_enabled) { + bind_data.file_row_number_idx = names.size(); + return_types.emplace_back(LogicalType::BIGINT); + names.emplace_back("file_row_number"); + } else { + // TODO: this is a bogus ID? Change for flag indicating it should be enabled? + bind_data.file_row_number_idx = names.size(); + } + + return true; }; void DeltaMultiFileReader::BindOptions(MultiFileReaderOptions &options, MultiFileList &files, - vector &return_types, vector &names, MultiFileReaderBindData& bind_data) { - - // Disable all other multifilereader options - options.auto_detect_hive_partitioning = false; - options.hive_partitioning = false; - options.union_by_name = false; - - MultiFileReader::BindOptions(options, files, return_types, names, bind_data); - - auto demo_gen_col_opt = options.custom_options.find("delta_file_number"); - if (demo_gen_col_opt != options.custom_options.end()) { - if (demo_gen_col_opt->second.GetValue()) { - names.push_back("delta_file_number"); - return_types.push_back(LogicalType::UBIGINT); - } - } + vector &return_types, vector &names, + MultiFileReaderBindData &bind_data) { + + // Disable all other multifilereader options + options.auto_detect_hive_partitioning = false; + options.hive_partitioning = false; + options.union_by_name = false; + + MultiFileReader::BindOptions(options, files, return_types, names, bind_data); + + auto demo_gen_col_opt = options.custom_options.find("delta_file_number"); + if (demo_gen_col_opt != options.custom_options.end()) { + if (demo_gen_col_opt->second.GetValue()) { + names.push_back("delta_file_number"); + return_types.push_back(LogicalType::UBIGINT); + } + } } -void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, - const string &filename, const vector &local_names, - const vector &global_types, const vector &global_names, - const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context, optional_ptr global_state) { - MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, global_column_ids, reader_data, context, global_state); - - // Handle custom delta option set in MultiFileReaderOptions::custom_options - auto file_number_opt = file_options.custom_options.find("delta_file_number"); - if (file_number_opt != file_options.custom_options.end()) { - if (file_number_opt->second.GetValue()) { - D_ASSERT(global_state); - auto &delta_global_state = global_state->Cast(); - D_ASSERT(delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX); - - // We add the constant column for the delta_file_number option - // NOTE: we add a placeholder here, to demonstrate how we can also populate extra columns in the FinalizeChunk - reader_data.constant_map.emplace_back(delta_global_state.delta_file_number_idx, Value::UBIGINT(0)); - } - } - - // Get the metadata for this file - D_ASSERT(global_state->file_list); - const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; - - if (!file_metadata->partition_map.empty()) { - for (idx_t i = 0; i < global_column_ids.size(); i++) { - column_t col_id = global_column_ids[i]; - if (IsRowIdColumnId(col_id)) { - continue; - } - auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]); - if (col_partition_entry != file_metadata->partition_map.end()) { - auto ¤t_type = global_types[col_id]; - if (current_type == LogicalType::BLOB) { - reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); - } else { - auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type); - reader_data.constant_map.emplace_back(i, maybe_value); - } - } - } - } +void DeltaMultiFileReader::FinalizeBind(const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &options, const string &filename, + const vector &local_names, const vector &global_types, + const vector &global_names, const vector &global_column_ids, + MultiFileReaderData &reader_data, ClientContext &context, + optional_ptr global_state) { + MultiFileReader::FinalizeBind(file_options, options, filename, local_names, global_types, global_names, + global_column_ids, reader_data, context, global_state); + + // Handle custom delta option set in MultiFileReaderOptions::custom_options + auto file_number_opt = file_options.custom_options.find("delta_file_number"); + if (file_number_opt != file_options.custom_options.end()) { + if (file_number_opt->second.GetValue()) { + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + D_ASSERT(delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX); + + // We add the constant column for the delta_file_number option + // NOTE: we add a placeholder here, to demonstrate how we can also populate extra columns in the + // FinalizeChunk + reader_data.constant_map.emplace_back(delta_global_state.delta_file_number_idx, Value::UBIGINT(0)); + } + } + + // Get the metadata for this file + D_ASSERT(global_state->file_list); + const auto &snapshot = dynamic_cast(*global_state->file_list); + auto &file_metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + + if (!file_metadata->partition_map.empty()) { + for (idx_t i = 0; i < global_column_ids.size(); i++) { + column_t col_id = global_column_ids[i]; + if (IsRowIdColumnId(col_id)) { + continue; + } + auto col_partition_entry = file_metadata->partition_map.find(global_names[col_id]); + if (col_partition_entry != file_metadata->partition_map.end()) { + auto ¤t_type = global_types[col_id]; + if (current_type == LogicalType::BLOB) { + reader_data.constant_map.emplace_back(i, Value::BLOB_RAW(col_partition_entry->second)); + } else { + auto maybe_value = Value(col_partition_entry->second).DefaultCastAs(current_type); + reader_data.constant_map.emplace_back(i, maybe_value); + } + } + } + } } -unique_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector& paths, FileGlobOptions options) { - if (paths.size() != 1) { - throw BinderException("'delta_scan' only supports single path as input"); - } +unique_ptr DeltaMultiFileReader::CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) { + if (paths.size() != 1) { + throw BinderException("'delta_scan' only supports single path as input"); + } - return make_uniq(context, paths[0]); + return make_uniq(context, paths[0]); } // Generate the correct Selection Vector Based on the Raw delta KernelBoolSlice dv and the row_id_column // TODO: this probably is slower than needed (we can do with less branches in the for loop for most cases) -static SelectionVector DuckSVFromDeltaSV(const ffi::KernelBoolSlice &dv, Vector row_id_column, idx_t count, idx_t &select_count) { - D_ASSERT(row_id_column.GetType() == LogicalType::BIGINT); - - UnifiedVectorFormat data; - row_id_column.ToUnifiedFormat(count, data); - auto row_ids = UnifiedVectorFormat::GetData(data); - - SelectionVector result {count}; - idx_t current_select = 0; - for (idx_t i = 0; i < count; i++) { - auto row_id = row_ids[data.sel->get_index(i)]; - - // TODO: why are deletion vectors not spanning whole data? - if (row_id >= dv.len || dv.ptr[row_id]) { - result.data()[current_select] = i; - current_select++; - } - } +static SelectionVector DuckSVFromDeltaSV(const ffi::KernelBoolSlice &dv, Vector row_id_column, idx_t count, + idx_t &select_count) { + D_ASSERT(row_id_column.GetType() == LogicalType::BIGINT); + + UnifiedVectorFormat data; + row_id_column.ToUnifiedFormat(count, data); + auto row_ids = UnifiedVectorFormat::GetData(data); + + SelectionVector result {count}; + idx_t current_select = 0; + for (idx_t i = 0; i < count; i++) { + auto row_id = row_ids[data.sel->get_index(i)]; + + // TODO: why are deletion vectors not spanning whole data? + if (row_id >= dv.len || dv.ptr[row_id]) { + result.data()[current_select] = i; + current_select++; + } + } - select_count = current_select; + select_count = current_select; - return result; + return result; } // Parses the columns that are used by the delta extension into void DeltaMultiFileReaderGlobalState::SetColumnIdx(const string &column, idx_t idx) { - if (column == "file_row_number") { - file_row_number_idx = idx; - return; - } else if (column == "delta_file_number") { - delta_file_number_idx = idx; - return; - } - throw IOException("Unknown column '%s' found as required by the DeltaMultiFileReader"); + if (column == "file_row_number") { + file_row_number_idx = idx; + return; + } else if (column == "delta_file_number") { + delta_file_number_idx = idx; + return; + } + throw IOException("Unknown column '%s' found as required by the DeltaMultiFileReader"); } -unique_ptr DeltaMultiFileReader::InitializeGlobalState(duckdb::ClientContext &context, - const duckdb::MultiFileReaderOptions &file_options, - const duckdb::MultiFileReaderBindData &bind_data, - const duckdb::MultiFileList &file_list, - const vector &global_types, - const vector &global_names, - const vector &global_column_ids) { - vector extra_columns; - vector> mapped_columns; - - // Create a map of the columns that are in the projection - case_insensitive_map_t selected_columns; - for (idx_t i = 0; i < global_column_ids.size(); i++) { - auto global_id = global_column_ids[i]; - if (IsRowIdColumnId(global_id)) { - continue; - } - - auto &global_name = global_names[global_id]; - selected_columns.insert({global_name, i}); - } - - // TODO: only add file_row_number column if there are deletes - case_insensitive_map_t columns_to_map = { - {"file_row_number", LogicalType::BIGINT}, - }; - - // Add the delta_file_number column to the columns to map - auto demo_gen_col_opt = file_options.custom_options.find("delta_file_number"); - if (demo_gen_col_opt != file_options.custom_options.end()) { - if (demo_gen_col_opt->second.GetValue()) { - columns_to_map.insert({"delta_file_number", LogicalType::UBIGINT}); - } - } - - // Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist - idx_t col_offset = 0; - for (const auto &required_column : columns_to_map) { - // First check if the column is in the projection - auto res = selected_columns.find(required_column.first); - if (res != selected_columns.end()) { - // The column is in the projection, no special handling is required; we simply store the index - mapped_columns.push_back({required_column.first, res->second}); - continue; - } - - // The column is NOT in the projection: it needs to be added as an extra_column - - // Calculate the index of the added column (extra columns are added after all other columns) - idx_t current_col_idx = global_column_ids.size() + col_offset++; - - // Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk - mapped_columns.push_back({required_column.first, current_col_idx}); - - // Ensure the result DataChunk has a vector of the correct type to store this column - extra_columns.push_back(required_column.second); - } - - auto res = make_uniq(extra_columns, &file_list); - - // Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use; - for (const auto& mapped_column : mapped_columns) { - res->SetColumnIdx(mapped_column.first, mapped_column.second); - } - - return std::move(res); +unique_ptr DeltaMultiFileReader::InitializeGlobalState( + duckdb::ClientContext &context, const duckdb::MultiFileReaderOptions &file_options, + const duckdb::MultiFileReaderBindData &bind_data, const duckdb::MultiFileList &file_list, + const vector &global_types, const vector &global_names, + const vector &global_column_ids) { + vector extra_columns; + vector> mapped_columns; + + // Create a map of the columns that are in the projection + case_insensitive_map_t selected_columns; + for (idx_t i = 0; i < global_column_ids.size(); i++) { + auto global_id = global_column_ids[i]; + if (IsRowIdColumnId(global_id)) { + continue; + } + + auto &global_name = global_names[global_id]; + selected_columns.insert({global_name, i}); + } + + // TODO: only add file_row_number column if there are deletes + case_insensitive_map_t columns_to_map = { + {"file_row_number", LogicalType::BIGINT}, + }; + + // Add the delta_file_number column to the columns to map + auto demo_gen_col_opt = file_options.custom_options.find("delta_file_number"); + if (demo_gen_col_opt != file_options.custom_options.end()) { + if (demo_gen_col_opt->second.GetValue()) { + columns_to_map.insert({"delta_file_number", LogicalType::UBIGINT}); + } + } + + // Map every column to either a column in the projection, or add it to the extra columns if it doesn't exist + idx_t col_offset = 0; + for (const auto &required_column : columns_to_map) { + // First check if the column is in the projection + auto res = selected_columns.find(required_column.first); + if (res != selected_columns.end()) { + // The column is in the projection, no special handling is required; we simply store the index + mapped_columns.push_back({required_column.first, res->second}); + continue; + } + + // The column is NOT in the projection: it needs to be added as an extra_column + + // Calculate the index of the added column (extra columns are added after all other columns) + idx_t current_col_idx = global_column_ids.size() + col_offset++; + + // Add column to the map, to ensure the MultiFileReader can find it when processing the Chunk + mapped_columns.push_back({required_column.first, current_col_idx}); + + // Ensure the result DataChunk has a vector of the correct type to store this column + extra_columns.push_back(required_column.second); + } + + auto res = make_uniq(extra_columns, &file_list); + + // Parse all the mapped columns into the DeltaMultiFileReaderGlobalState for easy use; + for (const auto &mapped_column : mapped_columns) { + res->SetColumnIdx(mapped_column.first, mapped_column.second); + } + + return std::move(res); } // This code is duplicated from MultiFileReader::CreateNameMapping the difference is that for columns that are not found // in the parquet files, we just add null constant columns static void CustomMulfiFileNameMapping(const string &file_name, const vector &local_types, - const vector &local_names, const vector &global_types, - const vector &global_names, const vector &global_column_ids, - MultiFileReaderData &reader_data, const string &initial_file, - optional_ptr global_state) { - D_ASSERT(global_types.size() == global_names.size()); + const vector &local_names, const vector &global_types, + const vector &global_names, const vector &global_column_ids, + MultiFileReaderData &reader_data, const string &initial_file, + optional_ptr global_state) { + D_ASSERT(global_types.size() == global_names.size()); D_ASSERT(local_types.size() == local_names.size()); // we have expected types: create a map of name -> column index case_insensitive_map_t name_map; @@ -776,10 +805,10 @@ static void CustomMulfiFileNameMapping(const string &file_name, const vectorsecond; @@ -799,138 +828,144 @@ static void CustomMulfiFileNameMapping(const string &file_name, const vector &local_types, - const vector &local_names, const vector &global_types, - const vector &global_names, const vector &global_column_ids, - MultiFileReaderData &reader_data, const string &initial_file, - optional_ptr global_state) { - // First call the base implementation to do most mapping - CustomMulfiFileNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, reader_data, initial_file, global_state); - - // Then we handle delta specific mapping - D_ASSERT(global_state); - auto &delta_global_state = global_state->Cast(); - - // Check if the file_row_number column is an "extra_column" which is not part of the projection - if (delta_global_state.file_row_number_idx >= global_column_ids.size()) { - D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); - - // Build the name map - case_insensitive_map_t name_map; - for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { - name_map[local_names[col_idx]] = col_idx; - } - - // Lookup the required column in the local map - auto entry = name_map.find("file_row_number"); - if (entry == name_map.end()) { - throw IOException("Failed to find the file_row_number column"); - } - - // Register the column to be scanned from this file - reader_data.column_ids.push_back(entry->second); - reader_data.column_mapping.push_back(delta_global_state.file_row_number_idx); - } - - // This may have changed: update it - reader_data.empty_columns = reader_data.column_ids.empty(); + const vector &local_names, const vector &global_types, + const vector &global_names, + const vector &global_column_ids, + MultiFileReaderData &reader_data, const string &initial_file, + optional_ptr global_state) { + // First call the base implementation to do most mapping + CustomMulfiFileNameMapping(file_name, local_types, local_names, global_types, global_names, global_column_ids, + reader_data, initial_file, global_state); + + // Then we handle delta specific mapping + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + + // Check if the file_row_number column is an "extra_column" which is not part of the projection + if (delta_global_state.file_row_number_idx >= global_column_ids.size()) { + D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); + + // Build the name map + case_insensitive_map_t name_map; + for (idx_t col_idx = 0; col_idx < local_names.size(); col_idx++) { + name_map[local_names[col_idx]] = col_idx; + } + + // Lookup the required column in the local map + auto entry = name_map.find("file_row_number"); + if (entry == name_map.end()) { + throw IOException("Failed to find the file_row_number column"); + } + + // Register the column to be scanned from this file + reader_data.column_ids.push_back(entry->second); + reader_data.column_mapping.push_back(delta_global_state.file_row_number_idx); + } + + // This may have changed: update it + reader_data.empty_columns = reader_data.column_ids.empty(); } void DeltaMultiFileReader::FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr global_state) { - // Base class finalization first - MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state); - - D_ASSERT(global_state); - auto &delta_global_state = global_state->Cast(); - D_ASSERT(delta_global_state.file_list); - - // Get the metadata for this file - const auto &snapshot = dynamic_cast(*global_state->file_list); - auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; - - if (metadata->selection_vector.ptr && chunk.size() != 0) { - D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); - auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; - - // Construct the selection vector using the file_row_number column and the raw selection vector from delta - idx_t select_count; - auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count); - chunk.Slice(sv, select_count); - } - - // Note: this demo function shows how we can use DuckDB's Binder create expression-based generated columns - if (delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX) { - //! Create Dummy expression (0 + file_number) - vector> child_expr; - child_expr.push_back(make_uniq(Value::UBIGINT(0))); - child_expr.push_back(make_uniq(Value::UBIGINT(7))); - unique_ptr expr = make_uniq("+", std::move(child_expr), nullptr, nullptr, false, true); - - //! s dummy expression - auto binder = Binder::CreateBinder(context); - ExpressionBinder expr_binder(*binder, context); - auto bound_expr = expr_binder.Bind(expr, nullptr); - - //! Execute dummy expression into result column - ExpressionExecutor expr_executor(context); - expr_executor.AddExpression(*bound_expr); - - //! Execute the expression directly into the output Chunk - expr_executor.ExecuteExpression(chunk.data[delta_global_state.delta_file_number_idx]); - } + const MultiFileReaderData &reader_data, DataChunk &chunk, + optional_ptr global_state) { + // Base class finalization first + MultiFileReader::FinalizeChunk(context, bind_data, reader_data, chunk, global_state); + + D_ASSERT(global_state); + auto &delta_global_state = global_state->Cast(); + D_ASSERT(delta_global_state.file_list); + + // Get the metadata for this file + const auto &snapshot = dynamic_cast(*global_state->file_list); + auto &metadata = snapshot.metadata[reader_data.file_list_idx.GetIndex()]; + + if (metadata->selection_vector.ptr && chunk.size() != 0) { + D_ASSERT(delta_global_state.file_row_number_idx != DConstants::INVALID_INDEX); + auto &file_row_number_column = chunk.data[delta_global_state.file_row_number_idx]; + + // Construct the selection vector using the file_row_number column and the raw selection vector from delta + idx_t select_count; + auto sv = DuckSVFromDeltaSV(metadata->selection_vector, file_row_number_column, chunk.size(), select_count); + chunk.Slice(sv, select_count); + } + + // Note: this demo function shows how we can use DuckDB's Binder create expression-based generated columns + if (delta_global_state.delta_file_number_idx != DConstants::INVALID_INDEX) { + //! Create Dummy expression (0 + file_number) + vector> child_expr; + child_expr.push_back(make_uniq(Value::UBIGINT(0))); + child_expr.push_back(make_uniq(Value::UBIGINT(7))); + unique_ptr expr = + make_uniq("+", std::move(child_expr), nullptr, nullptr, false, true); + + //! s dummy expression + auto binder = Binder::CreateBinder(context); + ExpressionBinder expr_binder(*binder, context); + auto bound_expr = expr_binder.Bind(expr, nullptr); + + //! Execute dummy expression into result column + ExpressionExecutor expr_executor(context); + expr_executor.AddExpression(*bound_expr); + + //! Execute the expression directly into the output Chunk + expr_executor.ExecuteExpression(chunk.data[delta_global_state.delta_file_number_idx]); + } }; -bool DeltaMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, ClientContext &context) { - auto loption = StringUtil::Lower(key); +bool DeltaMultiFileReader::ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, + ClientContext &context) { + auto loption = StringUtil::Lower(key); - if (loption == "delta_file_number") { - options.custom_options[loption] = val; - return true; - } + if (loption == "delta_file_number") { + options.custom_options[loption] = val; + return true; + } - // We need to capture this one to know whether to emit - if (loption == "file_row_number") { - options.custom_options[loption] = val; - return true; - } + // We need to capture this one to know whether to emit + if (loption == "file_row_number") { + options.custom_options[loption] = val; + return true; + } - return MultiFileReader::ParseOption(key, val, options, context); + return MultiFileReader::ParseOption(key, val, options, context); } // -//DeltaMultiFileReaderBindData::DeltaMultiFileReaderBindData(DeltaSnapshot & delta_snapshot): current_snapshot(delta_snapshot){ +// DeltaMultiFileReaderBindData::DeltaMultiFileReaderBindData(DeltaSnapshot & delta_snapshot): +// current_snapshot(delta_snapshot){ // //} TableFunctionSet DeltaFunctions::GetDeltaScanFunction(DatabaseInstance &instance) { - // The delta_scan function is constructed by grabbing the parquet scan from the Catalog, then injecting the - // DeltaMultiFileReader into it to create a Delta-based multi file read + // The delta_scan function is constructed by grabbing the parquet scan from the Catalog, then injecting the + // DeltaMultiFileReader into it to create a Delta-based multi file read - auto &parquet_scan = ExtensionUtil::GetTableFunction(instance, "parquet_scan"); - auto parquet_scan_copy = parquet_scan.functions; + auto &parquet_scan = ExtensionUtil::GetTableFunction(instance, "parquet_scan"); + auto parquet_scan_copy = parquet_scan.functions; - for (auto &function : parquet_scan_copy.functions) { - // Register the MultiFileReader as the driver for reads - function.get_multi_file_reader = DeltaMultiFileReader::CreateInstance; + for (auto &function : parquet_scan_copy.functions) { + // Register the MultiFileReader as the driver for reads + function.get_multi_file_reader = DeltaMultiFileReader::CreateInstance; - // Unset all of these: they are either broken, very inefficient. - // TODO: implement/fix these - function.serialize = nullptr; - function.deserialize = nullptr; - function.statistics = nullptr; - function.table_scan_progress = nullptr; - function.get_bind_info = nullptr; + // Unset all of these: they are either broken, very inefficient. + // TODO: implement/fix these + function.serialize = nullptr; + function.deserialize = nullptr; + function.statistics = nullptr; + function.table_scan_progress = nullptr; + function.get_bind_info = nullptr; - // Schema param is just confusing here - function.named_parameters.erase("schema"); + // Schema param is just confusing here + function.named_parameters.erase("schema"); - // Demonstration of a generated column based on information from DeltaSnapshot - function.named_parameters["delta_file_number"] = LogicalType::BOOLEAN; + // Demonstration of a generated column based on information from DeltaSnapshot + function.named_parameters["delta_file_number"] = LogicalType::BOOLEAN; - function.name = "delta_scan"; - } + function.name = "delta_scan"; + } - parquet_scan_copy.name = "delta_scan"; - return parquet_scan_copy; + parquet_scan_copy.name = "delta_scan"; + return parquet_scan_copy; } } // namespace duckdb diff --git a/src/include/delta_functions.hpp b/src/include/delta_functions.hpp index 390c593..4f819cb 100644 --- a/src/include/delta_functions.hpp +++ b/src/include/delta_functions.hpp @@ -14,9 +14,9 @@ namespace duckdb { class DeltaFunctions { public: - static vector GetTableFunctions(DatabaseInstance &instance); + static vector GetTableFunctions(DatabaseInstance &instance); private: - static TableFunctionSet GetDeltaScanFunction(DatabaseInstance &instance); + static TableFunctionSet GetDeltaScanFunction(DatabaseInstance &instance); }; } // namespace duckdb diff --git a/src/include/delta_kernel_ffi.hpp b/src/include/delta_kernel_ffi.hpp index 3c46bde..6f1401e 100644 --- a/src/include/delta_kernel_ffi.hpp +++ b/src/include/delta_kernel_ffi.hpp @@ -3,55 +3,55 @@ #include #include #include -#include #include +#include namespace ffi { enum class KernelError { - UnknownError, - FFIError, + UnknownError, + FFIError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ArrowError, + ArrowError, #endif - EngineDataTypeError, - ExtractError, - GenericError, - IOErrorError, + EngineDataTypeError, + ExtractError, + GenericError, + IOErrorError, #if (defined(DEFINE_DEFAULT_ENGINE) || defined(DEFINE_SYNC_ENGINE)) - ParquetError, + ParquetError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStoreError, + ObjectStoreError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ObjectStorePathError, + ObjectStorePathError, #endif #if defined(DEFINE_DEFAULT_ENGINE) - ReqwestError, + ReqwestError, #endif - FileNotFoundError, - MissingColumnError, - UnexpectedColumnTypeError, - MissingDataError, - MissingVersionError, - DeletionVectorError, - InvalidUrlError, - MalformedJsonError, - MissingMetadataError, - MissingProtocolError, - MissingMetadataAndProtocolError, - ParseError, - JoinFailureError, - Utf8Error, - ParseIntError, - InvalidColumnMappingModeError, - InvalidTableLocationError, - InvalidDecimalError, - InvalidStructDataError, - InternalError, - InvalidExpression, - InvalidLogPath, + FileNotFoundError, + MissingColumnError, + UnexpectedColumnTypeError, + MissingDataError, + MissingVersionError, + DeletionVectorError, + InvalidUrlError, + MalformedJsonError, + MissingMetadataError, + MissingProtocolError, + MissingMetadataAndProtocolError, + ParseError, + JoinFailureError, + Utf8Error, + ParseIntError, + InvalidColumnMappingModeError, + InvalidTableLocationError, + InvalidDecimalError, + InvalidStructDataError, + InternalError, + InvalidExpression, + InvalidLogPath, }; struct CStringMap; @@ -91,15 +91,15 @@ struct StringSliceIterator; /// receives a `KernelBoolSlice` as a return value from a kernel method, engine is responsible /// to free that slice, by calling [super::free_bool_slice] exactly once. struct KernelBoolSlice { - bool *ptr; - uintptr_t len; + bool *ptr; + uintptr_t len; }; /// An owned slice of u64 row indexes allocated by the kernel. The engine is responsible for /// freeing this slice by calling [super::free_row_indexes] once. struct KernelRowIndexArray { - uint64_t *ptr; - uintptr_t len; + uint64_t *ptr; + uintptr_t len; }; /// Represents an object that crosses the FFI boundary and which outlives the scope that created @@ -134,8 +134,8 @@ struct KernelRowIndexArray { /// NOTE: Because the underlying type is always [`Sync`], multi-threaded external code can /// freely access shared (non-mutable) handles. /// -template -using Handle = H*; +template +using Handle = H *; /// An error that can be returned to the engine. Engines that wish to associate additional /// information can define and use any type that is [pointer @@ -144,31 +144,31 @@ using Handle = H*; /// of a [standard layout](https://en.cppreference.com/w/cpp/language/data_members#Standard-layout) /// class. struct EngineError { - KernelError etype; + KernelError etype; }; /// Semantics: Kernel will always immediately return the leaked engine error to the engine (if it /// allocated one at all), and engine is responsible for freeing it. -template +template struct ExternResult { - enum class Tag { - Ok, - Err, - }; - - struct Ok_Body { - T _0; - }; - - struct Err_Body { - EngineError *_0; - }; - - Tag tag; - union { - Ok_Body ok; - Err_Body err; - }; + enum class Tag { + Ok, + Err, + }; + + struct Ok_Body { + T _0; + }; + + struct Err_Body { + EngineError *_0; + }; + + Tag tag; + union { + Ok_Body ok; + Err_Body err; + }; }; /// A non-owned slice of a UTF8 string, intended for arg-passing between kernel and engine. The @@ -193,32 +193,32 @@ struct ExternResult { /// wants_slice(msg.into()); /// ``` struct KernelStringSlice { - const char *ptr; - uintptr_t len; + const char *ptr; + uintptr_t len; }; -using AllocateErrorFn = EngineError*(*)(KernelError etype, KernelStringSlice msg); +using AllocateErrorFn = EngineError *(*)(KernelError etype, KernelStringSlice msg); -using NullableCvoid = void*; +using NullableCvoid = void *; /// Allow engines to allocate strings of their own type. the contract of calling a passed allocate /// function is that `kernel_str` is _only_ valid until the return from this function -using AllocateStringFn = NullableCvoid(*)(KernelStringSlice kernel_str); +using AllocateStringFn = NullableCvoid (*)(KernelStringSlice kernel_str); struct FileMeta { - KernelStringSlice path; - int64_t last_modified; - uintptr_t size; + KernelStringSlice path; + int64_t last_modified; + uintptr_t size; }; /// Model iterators. This allows an engine to specify iteration however it likes, and we simply wrap /// the engine functions. The engine retains ownership of the iterator. struct EngineIterator { - void *data; - /// A function that should advance the iterator and return the next time from the data - /// If the iterator is complete, it should return null. It should be safe to - /// call `get_next()` multiple times if it returns null. - const void *(*get_next)(void *data); + void *data; + /// A function that should advance the iterator and return the next time from the data + /// If the iterator is complete, it should return null. It should be safe to + /// call `get_next()` multiple times if it returns null. + const void *(*get_next)(void *data); }; /// ABI-compatible struct for ArrowArray from C Data Interface @@ -232,16 +232,16 @@ struct EngineIterator { /// } /// ``` struct FFI_ArrowArray { - int64_t length; - int64_t null_count; - int64_t offset; - int64_t n_buffers; - int64_t n_children; - const void **buffers; - FFI_ArrowArray **children; - FFI_ArrowArray *dictionary; - void (*release)(FFI_ArrowArray *arg1); - void *private_data; + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void **buffers; + FFI_ArrowArray **children; + FFI_ArrowArray *dictionary; + void (*release)(FFI_ArrowArray *arg1); + void *private_data; }; /// ABI-compatible struct for `ArrowSchema` from C Data Interface @@ -256,16 +256,16 @@ struct FFI_ArrowArray { /// ``` /// struct FFI_ArrowSchema { - const char *format; - const char *name; - const char *metadata; - /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) - int64_t flags; - int64_t n_children; - FFI_ArrowSchema **children; - FFI_ArrowSchema *dictionary; - void (*release)(FFI_ArrowSchema *arg1); - void *private_data; + const char *format; + const char *name; + const char *metadata; + /// Refer to [Arrow Flags](https://arrow.apache.org/docs/format/CDataInterface.html#c.ArrowSchema.flags) + int64_t flags; + int64_t n_children; + FFI_ArrowSchema **children; + FFI_ArrowSchema *dictionary; + void (*release)(FFI_ArrowSchema *arg1); + void *private_data; }; #if defined(DEFINE_DEFAULT_ENGINE) @@ -273,8 +273,8 @@ struct FFI_ArrowSchema { /// Interface](https://arrow.apache.org/docs/format/CDataInterface.html). This includes the data and /// the schema. struct ArrowFFIData { - FFI_ArrowArray array; - FFI_ArrowSchema schema; + FFI_ArrowArray array; + FFI_ArrowSchema schema; }; #endif @@ -289,39 +289,35 @@ struct ArrowFFIData { /// kernel each retain ownership of their respective objects, with no need to coordinate memory /// lifetimes with the other. struct EnginePredicate { - void *predicate; - uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); + void *predicate; + uintptr_t (*visitor)(void *predicate, KernelExpressionVisitorState *state); }; /// Give engines an easy way to consume stats struct Stats { - /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the - /// `num_records` statistic must be present and accurate, and must equal the number of records - /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat - /// outdated, i.e. not reflecting deleted rows yet. - uint64_t num_records; + /// For any file where the deletion vector is not present (see [`DvInfo::has_vector`]), the + /// `num_records` statistic must be present and accurate, and must equal the number of records + /// in the data file. In the presence of Deletion Vectors the statistics may be somewhat + /// outdated, i.e. not reflecting deleted rows yet. + uint64_t num_records; }; -using CScanCallback = void(*)(NullableCvoid engine_context, - KernelStringSlice path, - int64_t size, - const Stats *stats, - const DvInfo *dv_info, - const CStringMap *partition_map); +using CScanCallback = void (*)(NullableCvoid engine_context, KernelStringSlice path, int64_t size, const Stats *stats, + const DvInfo *dv_info, const CStringMap *partition_map); // This trickery is from https://github.com/mozilla/cbindgen/issues/402#issuecomment-578680163 struct im_an_unused_struct_that_tricks_msvc_into_compilation { - ExternResult field; - ExternResult field2; - ExternResult field3; - ExternResult> field4; - ExternResult> field5; - ExternResult field6; - ExternResult field7; - ExternResult> field8; - ExternResult> field9; - ExternResult> field10; - ExternResult field11; + ExternResult field; + ExternResult field2; + ExternResult field3; + ExternResult> field4; + ExternResult> field5; + ExternResult field6; + ExternResult field7; + ExternResult> field8; + ExternResult> field9; + ExternResult> field10; + ExternResult field11; }; /// The `EngineSchemaVisitor` defines a visitor system to allow engines to build their own @@ -350,61 +346,49 @@ struct im_an_unused_struct_that_tricks_msvc_into_compilation { /// that element's (already-visited) children. /// 4. The [`visit_schema`] method returns the id of the list of top-level columns struct EngineSchemaVisitor { - /// opaque state pointer - void *data; - /// Creates a new field list, optionally reserving capacity up front - uintptr_t (*make_field_list)(void *data, uintptr_t reserve); - /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a - /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. - void (*visit_struct)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - uintptr_t child_list_id); - /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list - /// with the array's element type - void (*visit_array)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - bool contains_null, - uintptr_t child_list_id); - /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list - /// where the first element is the map's key type and the second element is the - /// map's value type - void (*visit_map)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - bool value_contains_null, - uintptr_t child_list_id); - /// visit a `decimal` with the specified `precision` and `scale` - void (*visit_decimal)(void *data, - uintptr_t sibling_list_id, - KernelStringSlice name, - uint8_t precision, - uint8_t scale); - /// Visit a `string` belonging to the list identified by `sibling_list_id`. - void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `long` belonging to the list identified by `sibling_list_id`. - void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit an `integer` belonging to the list identified by `sibling_list_id`. - void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `short` belonging to the list identified by `sibling_list_id`. - void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `byte` belonging to the list identified by `sibling_list_id`. - void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `float` belonging to the list identified by `sibling_list_id`. - void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `double` belonging to the list identified by `sibling_list_id`. - void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. - void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit `binary` belonging to the list identified by `sibling_list_id`. - void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `date` belonging to the list identified by `sibling_list_id`. - void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); - /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. - void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// opaque state pointer + void *data; + /// Creates a new field list, optionally reserving capacity up front + uintptr_t (*make_field_list)(void *data, uintptr_t reserve); + /// Indicate that the schema contains a `Struct` type. The top level of a Schema is always a + /// `Struct`. The fields of the `Struct` are in the list identified by `child_list_id`. + void (*visit_struct)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uintptr_t child_list_id); + /// Indicate that the schema contains an Array type. `child_list_id` will be a _one_ item list + /// with the array's element type + void (*visit_array)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool contains_null, + uintptr_t child_list_id); + /// Indicate that the schema contains an Map type. `child_list_id` will be a _two_ item list + /// where the first element is the map's key type and the second element is the + /// map's value type + void (*visit_map)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, bool value_contains_null, + uintptr_t child_list_id); + /// visit a `decimal` with the specified `precision` and `scale` + void (*visit_decimal)(void *data, uintptr_t sibling_list_id, KernelStringSlice name, uint8_t precision, + uint8_t scale); + /// Visit a `string` belonging to the list identified by `sibling_list_id`. + void (*visit_string)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `long` belonging to the list identified by `sibling_list_id`. + void (*visit_long)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit an `integer` belonging to the list identified by `sibling_list_id`. + void (*visit_integer)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `short` belonging to the list identified by `sibling_list_id`. + void (*visit_short)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `byte` belonging to the list identified by `sibling_list_id`. + void (*visit_byte)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `float` belonging to the list identified by `sibling_list_id`. + void (*visit_float)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `double` belonging to the list identified by `sibling_list_id`. + void (*visit_double)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `boolean` belonging to the list identified by `sibling_list_id`. + void (*visit_boolean)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit `binary` belonging to the list identified by `sibling_list_id`. + void (*visit_binary)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `date` belonging to the list identified by `sibling_list_id`. + void (*visit_date)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); + /// Visit a `timestamp` with no timezone belonging to the list identified by `sibling_list_id`. + void (*visit_timestamp_ntz)(void *data, uintptr_t sibling_list_id, KernelStringSlice name); }; extern "C" { @@ -433,8 +417,7 @@ void free_engine_data(Handle engine_data); /// /// # Safety /// Caller is responsible for passing a valid path pointer. -ExternResult get_engine_builder(KernelStringSlice path, - AllocateErrorFn allocate_error); +ExternResult get_engine_builder(KernelStringSlice path, AllocateErrorFn allocate_error); #endif #if defined(DEFINE_DEFAULT_ENGINE) @@ -461,8 +444,7 @@ ExternResult> builder_build(EngineBuilder *builder); /// # Safety /// /// Caller is responsible for passing a valid path pointer. -ExternResult> get_default_engine(KernelStringSlice path, - AllocateErrorFn allocate_error); +ExternResult> get_default_engine(KernelStringSlice path, AllocateErrorFn allocate_error); #endif #if defined(DEFINE_SYNC_ENGINE) @@ -482,8 +464,7 @@ void free_engine(Handle engine); /// # Safety /// /// Caller is responsible for passing valid handles and path pointer. -ExternResult> snapshot(KernelStringSlice path, - Handle engine); +ExternResult> snapshot(KernelStringSlice path, Handle engine); /// # Safety /// @@ -509,8 +490,7 @@ NullableCvoid snapshot_table_root(Handle snapshot, AllocateStrin /// /// The iterator must be valid (returned by [kernel_scan_data_init]) and not yet freed by /// [kernel_scan_data_free]. The visitor function pointer must be non-null. -bool string_slice_next(Handle data, - NullableCvoid engine_context, +bool string_slice_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, KernelStringSlice slice)); /// # Safety @@ -527,8 +507,7 @@ void free_string_slice_data(Handle data); /// /// The iterator must be valid (returned by [`read_parquet_file`]) and not yet freed by /// [`free_read_result_iter`]. The visitor function pointer must be non-null. -ExternResult read_result_next(Handle data, - NullableCvoid engine_context, +ExternResult read_result_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data)); @@ -543,9 +522,8 @@ void free_read_result_iter(Handle data); /// /// # Safety /// Caller is responsible for calling with a valid `ExternEngineHandle` and `FileMeta` -ExternResult> read_parquet_file(Handle engine, - const FileMeta *file, - Handle physical_schema); +ExternResult> +read_parquet_file(Handle engine, const FileMeta *file, Handle physical_schema); uintptr_t visit_expression_and(KernelExpressionVisitorState *state, EngineIterator *children); @@ -561,8 +539,7 @@ uintptr_t visit_expression_eq(KernelExpressionVisitorState *state, uintptr_t a, /// # Safety /// The string slice must be valid -ExternResult visit_expression_column(KernelExpressionVisitorState *state, - KernelStringSlice name, +ExternResult visit_expression_column(KernelExpressionVisitorState *state, KernelStringSlice name, AllocateErrorFn allocate_error); uintptr_t visit_expression_not(KernelExpressionVisitorState *state, uintptr_t inner_expr); @@ -571,8 +548,7 @@ uintptr_t visit_expression_is_null(KernelExpressionVisitorState *state, uintptr_ /// # Safety /// The string slice must be valid -ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, - KernelStringSlice value, +ExternResult visit_expression_literal_string(KernelExpressionVisitorState *state, KernelStringSlice value, AllocateErrorFn allocate_error); uintptr_t visit_expression_literal_int(KernelExpressionVisitorState *state, int32_t value); @@ -612,8 +588,7 @@ void *get_raw_engine_data(Handle data); /// # Safety /// data_handle must be a valid ExclusiveEngineData as read by the /// [`delta_kernel::engine::default::DefaultEngine`] obtained from `get_default_engine`. -ExternResult get_raw_arrow_data(Handle data, - Handle engine); +ExternResult get_raw_arrow_data(Handle data, Handle engine); #endif /// Drops a scan. @@ -625,8 +600,7 @@ void free_scan(Handle scan); /// # Safety /// /// Caller is responsible for passing a valid snapshot pointer, and engine pointer -ExternResult> scan(Handle snapshot, - Handle engine, +ExternResult> scan(Handle snapshot, Handle engine, EnginePredicate *predicate); /// Get the global state for a scan. See the docs for [`delta_kernel::scan::state::GlobalScanState`] @@ -680,8 +654,7 @@ ExternResult> kernel_scan_data_init(Handle kernel_scan_data_next(Handle data, - NullableCvoid engine_context, +ExternResult kernel_scan_data_next(Handle data, NullableCvoid engine_context, void (*engine_visitor)(NullableCvoid engine_context, Handle engine_data, KernelBoolSlice selection_vector)); @@ -699,24 +672,20 @@ void free_kernel_scan_data(Handle data); /// # Safety /// /// The engine is responsible for providing a valid [`CStringMap`] pointer and [`KernelStringSlice`] -NullableCvoid get_from_map(const CStringMap *map, - KernelStringSlice key, - AllocateStringFn allocate_fn); +NullableCvoid get_from_map(const CStringMap *map, KernelStringSlice key, AllocateStringFn allocate_fn); /// Get a selection vector out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult selection_vector_from_dv(const DvInfo *dv_info, - Handle engine, +ExternResult selection_vector_from_dv(const DvInfo *dv_info, Handle engine, Handle state); /// Get a vector of row indexes out of a [`DvInfo`] struct /// /// # Safety /// Engine is responsible for providing valid pointers for each argument -ExternResult row_indexes_from_dv(const DvInfo *dv_info, - Handle engine, +ExternResult row_indexes_from_dv(const DvInfo *dv_info, Handle engine, Handle state); /// Shim for ffi to call visit_scan_data. This will generally be called when iterating through scan @@ -724,9 +693,7 @@ ExternResult row_indexes_from_dv(const DvInfo *dv_info, /// /// # Safety /// engine is responsbile for passing a valid [`ExclusiveEngineData`] and selection vector. -void visit_scan_data(Handle data, - KernelBoolSlice selection_vec, - NullableCvoid engine_context, +void visit_scan_data(Handle data, KernelBoolSlice selection_vec, NullableCvoid engine_context, CScanCallback callback); /// Visit the schema of the passed `SnapshotHandle`, using the provided `visitor`. See the @@ -739,6 +706,6 @@ void visit_scan_data(Handle data, /// Caller is responsible for passing a valid snapshot handle and schema visitor. uintptr_t visit_schema(Handle snapshot, EngineSchemaVisitor *visitor); -} // extern "C" +} // extern "C" -} // namespace ffi +} // namespace ffi diff --git a/src/include/delta_utils.hpp b/src/include/delta_utils.hpp index 9b33c5c..23b87f4 100644 --- a/src/include/delta_utils.hpp +++ b/src/include/delta_utils.hpp @@ -1,11 +1,12 @@ #pragma once #include "delta_kernel_ffi.hpp" -#include "duckdb/planner/filter/constant_filter.hpp" -#include "duckdb/planner/filter/conjunction_filter.hpp" #include "duckdb/common/enum_util.hpp" -#include +#include "duckdb/planner/filter/conjunction_filter.hpp" +#include "duckdb/planner/filter/constant_filter.hpp" + #include +#include // TODO: clean up this file as we go @@ -14,48 +15,52 @@ namespace duckdb { // SchemaVisitor is used to parse the schema of a Delta table from the Kernel class SchemaVisitor { public: - using FieldList = child_list_t; + using FieldList = child_list_t; - static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot* snapshot); + static unique_ptr VisitSnapshotSchema(ffi::SharedSnapshot *snapshot); private: - unordered_map> inflight_lists; - uintptr_t next_id = 1; - - typedef void (SimpleTypeVisitorFunction)(void*, uintptr_t, ffi::KernelStringSlice); - - template - static SimpleTypeVisitorFunction* VisitSimpleType() { - return (SimpleTypeVisitorFunction*) &VisitSimpleTypeImpl; - } - template - static void VisitSimpleTypeImpl(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name) { - state->AppendToList(sibling_list_id, name, TypeId); - } - - static void VisitDecimal(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uint8_t precision, uint8_t scale); - static uintptr_t MakeFieldList(SchemaVisitor* state, uintptr_t capacity_hint); - static void VisitStruct(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, uintptr_t child_list_id); - static void VisitArray(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id); - static void VisitMap(SchemaVisitor* state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, bool contains_null, uintptr_t child_list_id); - - uintptr_t MakeFieldListImpl(uintptr_t capacity_hint); - void AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType&& child); - unique_ptr TakeFieldList(uintptr_t id); + unordered_map> inflight_lists; + uintptr_t next_id = 1; + + typedef void(SimpleTypeVisitorFunction)(void *, uintptr_t, ffi::KernelStringSlice); + + template + static SimpleTypeVisitorFunction *VisitSimpleType() { + return (SimpleTypeVisitorFunction *)&VisitSimpleTypeImpl; + } + template + static void VisitSimpleTypeImpl(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name) { + state->AppendToList(sibling_list_id, name, TypeId); + } + + static void VisitDecimal(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + uint8_t precision, uint8_t scale); + static uintptr_t MakeFieldList(SchemaVisitor *state, uintptr_t capacity_hint); + static void VisitStruct(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + uintptr_t child_list_id); + static void VisitArray(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + bool contains_null, uintptr_t child_list_id); + static void VisitMap(SchemaVisitor *state, uintptr_t sibling_list_id, ffi::KernelStringSlice name, + bool contains_null, uintptr_t child_list_id); + + uintptr_t MakeFieldListImpl(uintptr_t capacity_hint); + void AppendToList(uintptr_t id, ffi::KernelStringSlice name, LogicalType &&child); + unique_ptr TakeFieldList(uintptr_t id); }; // Allocator for errors that the kernel might throw struct DuckDBEngineError : ffi::EngineError { - // Allocate a DuckDBEngineError, function ptr passed to kernel for error allocation - static ffi::EngineError* AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg); - // Convert a kernel error enum to a string - static string KernelErrorEnumToString(ffi::KernelError err); + // Allocate a DuckDBEngineError, function ptr passed to kernel for error allocation + static ffi::EngineError *AllocateError(ffi::KernelError etype, ffi::KernelStringSlice msg); + // Convert a kernel error enum to a string + static string KernelErrorEnumToString(ffi::KernelError err); - // Throw the error as an IOException - [[noreturn]] void Throw(string from_info); + // Throw the error as an IOException + [[noreturn]] void Throw(string from_info); - // The error message from Kernel - string error_message; + // The error message from Kernel + string error_message; }; // RAII wrapper that returns ownership of a kernel pointer to kernel when it goes out of @@ -63,43 +68,45 @@ struct DuckDBEngineError : ffi::EngineError { // kernel type to be complete. template struct UniqueKernelPointer { - UniqueKernelPointer() : ptr(nullptr), free(nullptr) {} - - // Takes ownership of a pointer with associated deleter. - UniqueKernelPointer(KernelType* ptr, void (*free)(KernelType*)) : ptr(ptr), free(free) {} - - // movable but not copyable - UniqueKernelPointer(UniqueKernelPointer&& other) : ptr(other.ptr) { - other.ptr = nullptr; - } - UniqueKernelPointer& operator=(UniqueKernelPointer&& other) { - std::swap(ptr, other.ptr); - std::swap(free, other.free); - return *this; - } - UniqueKernelPointer(const UniqueKernelPointer&) = delete; - UniqueKernelPointer& operator=(const UniqueKernelPointer&) = delete; - - ~UniqueKernelPointer() { - if (ptr && free) { - free(ptr); - } - } - - KernelType* get() const { return ptr; } + UniqueKernelPointer() : ptr(nullptr), free(nullptr) { + } + + // Takes ownership of a pointer with associated deleter. + UniqueKernelPointer(KernelType *ptr, void (*free)(KernelType *)) : ptr(ptr), free(free) { + } + + // movable but not copyable + UniqueKernelPointer(UniqueKernelPointer &&other) : ptr(other.ptr) { + other.ptr = nullptr; + } + UniqueKernelPointer &operator=(UniqueKernelPointer &&other) { + std::swap(ptr, other.ptr); + std::swap(free, other.free); + return *this; + } + UniqueKernelPointer(const UniqueKernelPointer &) = delete; + UniqueKernelPointer &operator=(const UniqueKernelPointer &) = delete; + + ~UniqueKernelPointer() { + if (ptr && free) { + free(ptr); + } + } + + KernelType *get() const { + return ptr; + } private: - KernelType* ptr; - void (*free)(KernelType*) = nullptr; + KernelType *ptr; + void (*free)(KernelType *) = nullptr; }; // Syntactic sugar around the different kernel types -template +template struct TemplatedUniqueKernelPointer : public UniqueKernelPointer { - TemplatedUniqueKernelPointer() : UniqueKernelPointer() { - }; - TemplatedUniqueKernelPointer(KernelType* ptr) : UniqueKernelPointer(ptr, DeleteFunction) { - }; + TemplatedUniqueKernelPointer() : UniqueKernelPointer() {}; + TemplatedUniqueKernelPointer(KernelType *ptr) : UniqueKernelPointer(ptr, DeleteFunction) {}; }; typedef TemplatedUniqueKernelPointer KernelSnapshot; @@ -109,43 +116,46 @@ typedef TemplatedUniqueKernelPointer KernelScanDataIterator; struct KernelUtils { - static ffi::KernelStringSlice ToDeltaString(const string &str); - static string FromDeltaString(const struct ffi::KernelStringSlice slice); - static vector FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice); - - // TODO: all kernel results need to be unpacked, not doing so will result in an error. This should be cleaned up - template - static T UnpackResult(ffi::ExternResult result, const string &from_where) { - if (result.tag == ffi::ExternResult::Tag::Err) { - if (result.err._0){ - auto error_cast = static_cast(result.err._0); - error_cast->Throw(from_where); - } else { - throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", from_where.c_str()); - } - } else if (result.tag == ffi::ExternResult::Tag::Ok) { - return result.ok._0; - } - throw IOException("Invalid error ExternResult tag found!"); - } + static ffi::KernelStringSlice ToDeltaString(const string &str); + static string FromDeltaString(const struct ffi::KernelStringSlice slice); + static vector FromDeltaBoolSlice(const struct ffi::KernelBoolSlice slice); + + // TODO: all kernel results need to be unpacked, not doing so will result in an error. This should be cleaned up + template + static T UnpackResult(ffi::ExternResult result, const string &from_where) { + if (result.tag == ffi::ExternResult::Tag::Err) { + if (result.err._0) { + auto error_cast = static_cast(result.err._0); + error_cast->Throw(from_where); + } else { + throw IOException("Hit DeltaKernel FFI error (from: %s): Hit error, but error was nullptr", + from_where.c_str()); + } + } else if (result.tag == ffi::ExternResult::Tag::Ok) { + return result.ok._0; + } + throw IOException("Invalid error ExternResult tag found!"); + } }; class PredicateVisitor : public ffi::EnginePredicate { public: - PredicateVisitor(const vector &column_names, optional_ptr filters); + PredicateVisitor(const vector &column_names, optional_ptr filters); private: - unordered_map column_filters; + unordered_map column_filters; - static uintptr_t VisitPredicate(PredicateVisitor* predicate, ffi::KernelExpressionVisitorState* state); + static uintptr_t VisitPredicate(PredicateVisitor *predicate, ffi::KernelExpressionVisitorState *state); - uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, ffi::KernelExpressionVisitorState* state); - uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitConstantFilter(const string &col_name, const ConstantFilter &filter, + ffi::KernelExpressionVisitorState *state); + uintptr_t VisitAndFilter(const string &col_name, const ConjunctionAndFilter &filter, + ffi::KernelExpressionVisitorState *state); - uintptr_t VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState* state); - uintptr_t VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitIsNull(const string &col_name, ffi::KernelExpressionVisitorState *state); + uintptr_t VisitIsNotNull(const string &col_name, ffi::KernelExpressionVisitorState *state); - uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState* state); + uintptr_t VisitFilter(const string &col_name, const TableFilter &filter, ffi::KernelExpressionVisitorState *state); }; } // namespace duckdb diff --git a/src/include/functions/delta_scan.hpp b/src/include/functions/delta_scan.hpp index aac35cc..936de5a 100644 --- a/src/include/functions/delta_scan.hpp +++ b/src/include/functions/delta_scan.hpp @@ -14,133 +14,137 @@ namespace duckdb { struct DeltaFileMetaData { - DeltaFileMetaData() {}; - - // No copying pls - DeltaFileMetaData (const DeltaFileMetaData&) = delete; - DeltaFileMetaData& operator= (const DeltaFileMetaData&) = delete; - - ~DeltaFileMetaData() { - if (selection_vector.ptr) { - ffi::free_bool_slice(selection_vector); - } - } - - idx_t delta_snapshot_version = DConstants::INVALID_INDEX; - idx_t file_number = DConstants::INVALID_INDEX; - idx_t cardinality = DConstants::INVALID_INDEX; - ffi::KernelBoolSlice selection_vector = {nullptr, 0}; - case_insensitive_map_t partition_map; + DeltaFileMetaData() {}; + + // No copying pls + DeltaFileMetaData(const DeltaFileMetaData &) = delete; + DeltaFileMetaData &operator=(const DeltaFileMetaData &) = delete; + + ~DeltaFileMetaData() { + if (selection_vector.ptr) { + ffi::free_bool_slice(selection_vector); + } + } + + idx_t delta_snapshot_version = DConstants::INVALID_INDEX; + idx_t file_number = DConstants::INVALID_INDEX; + idx_t cardinality = DConstants::INVALID_INDEX; + ffi::KernelBoolSlice selection_vector = {nullptr, 0}; + case_insensitive_map_t partition_map; }; //! The DeltaSnapshot implements the MultiFileList API to allow injecting it into the regular DuckDB parquet scan struct DeltaSnapshot : public MultiFileList { - DeltaSnapshot(ClientContext &context, const string &path); - string GetPath(); - static string ToDuckDBPath(const string &raw_path); - static string ToDeltaPath(const string &raw_path); + DeltaSnapshot(ClientContext &context, const string &path); + string GetPath(); + static string ToDuckDBPath(const string &raw_path); + static string ToDeltaPath(const string &raw_path); - //! MultiFileList API + //! MultiFileList API public: - void Bind(vector &return_types, vector &names); - unique_ptr ComplexFilterPushdown(ClientContext &context, - const MultiFileReaderOptions &options, MultiFilePushdownInfo &info, - vector> &filters) override; - vector GetAllFiles() override; - FileExpandResult GetExpandResult() override; - idx_t GetTotalFileCount() override; + void Bind(vector &return_types, vector &names); + unique_ptr ComplexFilterPushdown(ClientContext &context, const MultiFileReaderOptions &options, + MultiFilePushdownInfo &info, + vector> &filters) override; + vector GetAllFiles() override; + FileExpandResult GetExpandResult() override; + idx_t GetTotalFileCount() override; - unique_ptr GetCardinality(ClientContext &context) override; + unique_ptr GetCardinality(ClientContext &context) override; protected: - //! Get the i-th expanded file - string GetFile(idx_t i) override; + //! Get the i-th expanded file + string GetFile(idx_t i) override; protected: - // TODO: How to guarantee we only call this after the filter pushdown? - void InitializeFiles(); + // TODO: How to guarantee we only call this after the filter pushdown? + void InitializeFiles(); - template - T TryUnpackKernelResult(ffi::ExternResult result) { - return KernelUtils::UnpackResult(result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); - } + template + T TryUnpackKernelResult(ffi::ExternResult result) { + return KernelUtils::UnpackResult( + result, StringUtil::Format("While trying to read from delta table: '%s'", paths[0])); + } -// TODO: change back to protected + // TODO: change back to protected public: - idx_t version; + idx_t version; - //! Delta Kernel Structures - KernelSnapshot snapshot; - KernelExternEngine extern_engine; - KernelScan scan; - KernelGlobalScanState global_state; - KernelScanDataIterator scan_data_iterator; + //! Delta Kernel Structures + KernelSnapshot snapshot; + KernelExternEngine extern_engine; + KernelScan scan; + KernelGlobalScanState global_state; + KernelScanDataIterator scan_data_iterator; - //! Names - vector names; + //! Names + vector names; - //! Metadata map for files - vector> metadata; + //! Metadata map for files + vector> metadata; - //! Current file list resolution state - bool initialized = false; - bool files_exhausted = false; - vector resolved_files; - TableFilterSet table_filters; + //! Current file list resolution state + bool initialized = false; + bool files_exhausted = false; + vector resolved_files; + TableFilterSet table_filters; - ClientContext &context; + ClientContext &context; }; struct DeltaMultiFileReaderGlobalState : public MultiFileReaderGlobalState { - DeltaMultiFileReaderGlobalState(vector extra_columns_p, optional_ptr file_list_p) : MultiFileReaderGlobalState(extra_columns_p, file_list_p) { - } - //! The idx of the file number column in the result chunk - idx_t delta_file_number_idx = DConstants::INVALID_INDEX; - //! The idx of the file_row_number column in the result chunk - idx_t file_row_number_idx = DConstants::INVALID_INDEX; - - void SetColumnIdx(const string &column, idx_t idx); + DeltaMultiFileReaderGlobalState(vector extra_columns_p, optional_ptr file_list_p) + : MultiFileReaderGlobalState(extra_columns_p, file_list_p) { + } + //! The idx of the file number column in the result chunk + idx_t delta_file_number_idx = DConstants::INVALID_INDEX; + //! The idx of the file_row_number column in the result chunk + idx_t file_row_number_idx = DConstants::INVALID_INDEX; + + void SetColumnIdx(const string &column, idx_t idx); }; struct DeltaMultiFileReader : public MultiFileReader { - static unique_ptr CreateInstance(); - //! Return a DeltaSnapshot - unique_ptr CreateFileList(ClientContext &context, const vector &paths, - FileGlobOptions options) override; - - //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file - //! readers will try read - bool Bind(MultiFileReaderOptions &options, MultiFileList &files, - vector &return_types, vector &names, MultiFileReaderBindData &bind_data) override; - - //! Override the Options bind - void BindOptions(MultiFileReaderOptions &options, MultiFileList &files, - vector &return_types, vector &names, MultiFileReaderBindData& bind_data) override; - - void CreateNameMapping(const string &file_name, const vector &local_types, - const vector &local_names, const vector &global_types, - const vector &global_names, const vector &global_column_ids, - MultiFileReaderData &reader_data, const string &initial_file, - optional_ptr global_state) override; - - unique_ptr InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, - const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, - const vector &global_types, const vector &global_names, - const vector &global_column_ids) override; - - void FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, - const string &filename, const vector &local_names, - const vector &global_types, const vector &global_names, - const vector &global_column_ids, MultiFileReaderData &reader_data, - ClientContext &context, optional_ptr global_state) override; - - //! Override the FinalizeChunk method - void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, - const MultiFileReaderData &reader_data, DataChunk &chunk, optional_ptr global_state) override; - - //! Override the ParseOption call to parse delta_scan specific options - bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, - ClientContext &context) override; + static unique_ptr CreateInstance(); + //! Return a DeltaSnapshot + unique_ptr CreateFileList(ClientContext &context, const vector &paths, + FileGlobOptions options) override; + + //! Override the regular parquet bind using the MultiFileReader Bind. The bind from these are what DuckDB's file + //! readers will try read + bool Bind(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, + vector &names, MultiFileReaderBindData &bind_data) override; + + //! Override the Options bind + void BindOptions(MultiFileReaderOptions &options, MultiFileList &files, vector &return_types, + vector &names, MultiFileReaderBindData &bind_data) override; + + void CreateNameMapping(const string &file_name, const vector &local_types, + const vector &local_names, const vector &global_types, + const vector &global_names, const vector &global_column_ids, + MultiFileReaderData &reader_data, const string &initial_file, + optional_ptr global_state) override; + + unique_ptr + InitializeGlobalState(ClientContext &context, const MultiFileReaderOptions &file_options, + const MultiFileReaderBindData &bind_data, const MultiFileList &file_list, + const vector &global_types, const vector &global_names, + const vector &global_column_ids) override; + + void FinalizeBind(const MultiFileReaderOptions &file_options, const MultiFileReaderBindData &options, + const string &filename, const vector &local_names, + const vector &global_types, const vector &global_names, + const vector &global_column_ids, MultiFileReaderData &reader_data, + ClientContext &context, optional_ptr global_state) override; + + //! Override the FinalizeChunk method + void FinalizeChunk(ClientContext &context, const MultiFileReaderBindData &bind_data, + const MultiFileReaderData &reader_data, DataChunk &chunk, + optional_ptr global_state) override; + + //! Override the ParseOption call to parse delta_scan specific options + bool ParseOption(const string &key, const Value &val, MultiFileReaderOptions &options, + ClientContext &context) override; }; } // namespace duckdb