From 7ececffbe81b5e244bce4ced6579c30c3aac670a Mon Sep 17 00:00:00 2001 From: sunby Date: Wed, 18 Sep 2024 11:07:31 +0800 Subject: [PATCH] feat: support json index This PR adds json index support for json and dynamic fields. Now you can only do unary query like 'a["b"] > 1' using this index. We will support more filter type later. basic usage: ``` collection.create_index("json_field", {"index_type": "INVERTED", "params": {"json_cast_type": DataType.STRING, "json_path": 'json_field["a"]["b"]'}}) ``` There are some limits to use this index: 1. If a record does not have the json path you specify, it will be ignored and there will not be an error. 2. If a value of the json path fails to be cast to the type you specify, it will be ignored and there will not be an error. 3. A specific json path can have only one json index. 4. If you try to create more than one json indexes for one json field, sdk(pymilvus<=2.4.7) may return immediately because of internal implementation. This will be fixed soon. Signed-off-by: sunby --- internal/core/src/common/FieldDataInterface.h | 14 ++ .../expression/BinaryArithOpEvalRangeExpr.h | 1 + .../src/exec/expression/BinaryRangeExpr.h | 1 + .../core/src/exec/expression/ExistsExpr.h | 1 + internal/core/src/exec/expression/Expr.h | 45 +++- .../src/exec/expression/JsonContainsExpr.h | 1 + internal/core/src/exec/expression/TermExpr.h | 1 + .../core/src/exec/expression/UnaryExpr.cpp | 74 +++++-- internal/core/src/exec/expression/UnaryExpr.h | 4 + internal/core/src/index/IndexFactory.cpp | 55 ++++- internal/core/src/index/IndexFactory.h | 8 + internal/core/src/index/IndexInfo.h | 2 + .../core/src/index/InvertedIndexTantivy.cpp | 36 +-- .../core/src/index/InvertedIndexTantivy.h | 39 +++- internal/core/src/indexbuilder/IndexFactory.h | 3 + .../indexbuilder/JsonInvertedIndexCreator.cpp | 56 +++++ .../indexbuilder/JsonInvertedIndexCreator.h | 53 +++++ .../src/indexbuilder/ScalarIndexCreator.cpp | 7 + internal/core/src/segcore/SegmentInterface.h | 20 ++ .../core/src/segcore/SegmentSealedImpl.cpp | 28 ++- internal/core/src/segcore/SegmentSealedImpl.h | 8 + internal/core/src/segcore/load_index_c.cpp | 5 + .../core/thirdparty/tantivy/tantivy-wrapper.h | 2 + internal/core/unittest/test_expr.cpp | 60 +++++ internal/datacoord/index_meta.go | 4 + internal/datacoord/index_service.go | 207 ++++++++++++++++-- internal/metastore/model/index.go | 4 + internal/metastore/model/segment_index.go | 4 + internal/proto/index_coord.proto | 3 + internal/proto/query_coord.proto | 2 + .../querycoordv2/checkers/index_checker.go | 11 +- internal/querycoordv2/dist/dist_handler.go | 1 + .../querycoordv2/meta/coordinator_broker.go | 1 + .../querycoordv2/meta/segment_dist_manager.go | 1 + internal/querynodev2/segments/segment.go | 47 +++- .../querynodev2/segments/segment_interface.go | 1 + internal/querynodev2/segments/segment_l0.go | 4 + .../querynodev2/segments/segment_loader.go | 65 +++++- internal/querynodev2/services.go | 3 + pkg/util/indexparamcheck/inverted_checker.go | 2 +- 40 files changed, 779 insertions(+), 105 deletions(-) create mode 100644 internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp create mode 100644 internal/core/src/indexbuilder/JsonInvertedIndexCreator.h diff --git a/internal/core/src/common/FieldDataInterface.h b/internal/core/src/common/FieldDataInterface.h index 926a1bb16e3d9..1abb6d03a98fa 100644 --- a/internal/core/src/common/FieldDataInterface.h +++ b/internal/core/src/common/FieldDataInterface.h @@ -652,6 +652,20 @@ class FieldDataJsonImpl : public FieldDataImpl { } length_ += n; } + + // only for test + void + add_json_data(const std::vector& json) { + std::lock_guard lck(tell_mutex_); + if (length_ + json.size() > get_num_rows()) { + resize_field_data(length_ + json.size()); + } + + for (size_t i = 0; i < json.size(); ++i) { + data_[length_ + i] = json[i]; + } + length_ += json.size(); + } }; class FieldDataSparseVectorImpl diff --git a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h index 3c84819dc2b83..0301a8582ddc9 100644 --- a/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryArithOpEvalRangeExpr.h @@ -440,6 +440,7 @@ class PhyBinaryArithOpEvalRangeExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.h b/internal/core/src/exec/expression/BinaryRangeExpr.h index 6484a40e5ef1e..d766f72902da3 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.h +++ b/internal/core/src/exec/expression/BinaryRangeExpr.h @@ -177,6 +177,7 @@ class PhyBinaryRangeFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/ExistsExpr.h b/internal/core/src/exec/expression/ExistsExpr.h index 2b24108531575..6e8c413f91aec 100644 --- a/internal/core/src/exec/expression/ExistsExpr.h +++ b/internal/core/src/exec/expression/ExistsExpr.h @@ -47,6 +47,7 @@ class PhyExistsFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index b80d376c78ede..26c9b455dfaf2 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -19,12 +19,15 @@ #include #include +#include "common/FieldDataInterface.h" +#include "common/Json.h" #include "common/Types.h" #include "exec/expression/EvalCtx.h" #include "exec/expression/VectorFunction.h" #include "exec/expression/Utils.h" #include "exec/QueryContext.h" #include "expr/ITypeExpr.h" +#include "log/Log.h" #include "query/PlanProto.h" namespace milvus { @@ -88,12 +91,15 @@ class SegmentExpr : public Expr { SegmentExpr(const std::vector&& input, const std::string& name, const segcore::SegmentInternalInterface* segment, - const FieldId& field_id, + const FieldId field_id, + const std::vector nested_path, int64_t active_count, int64_t batch_size) : Expr(DataType::BOOL, std::move(input), name), segment_(segment), field_id_(field_id), + nested_path_(nested_path), + active_count_(active_count), batch_size_(batch_size) { size_per_chunk_ = segment_->size_per_chunk(); @@ -108,6 +114,7 @@ class SegmentExpr : public Expr { InitSegmentExpr() { auto& schema = segment_->get_schema(); auto& field_meta = schema[field_id_]; + field_type_ = field_meta.get_data_type(); if (schema.get_primary_field_id().has_value() && schema.get_primary_field_id().value() == field_id_ && @@ -116,9 +123,18 @@ class SegmentExpr : public Expr { pk_type_ = field_meta.get_data_type(); } - is_index_mode_ = segment_->HasIndex(field_id_); - if (is_index_mode_) { - num_index_chunk_ = segment_->num_chunk_index(field_id_); + if (field_meta.get_data_type() == DataType::JSON) { + auto pointer = milvus::Json::pointer(nested_path_); + if (segment_->HasIndex(pointer)) { + // FIXME: sunby + is_index_mode_ = true; + num_index_chunk_ = 1; + } + } else { + is_index_mode_ = segment_->HasIndex(field_id_); + if (is_index_mode_) { + num_index_chunk_ = segment_->num_chunk_index(field_id_); + } } // if index not include raw data, also need load data if (segment_->HasFieldData(field_id_)) { @@ -300,9 +316,21 @@ class SegmentExpr : public Expr { // It avoids indexing execute for evevy batch because indexing // executing costs quite much time. if (cached_index_chunk_id_ != i) { - const Index& index = - segment_->chunk_scalar_index(field_id_, i); - auto* index_ptr = const_cast(&index); + Index* index_ptr = nullptr; + + if (field_type_ == DataType::JSON) { + auto pointer = milvus::Json::pointer(nested_path_); + + const Index& index = + segment_->chunk_scalar_index(pointer, + i); + index_ptr = const_cast(&index); + } else { + const Index& index = + segment_->chunk_scalar_index(field_id_, + i); + index_ptr = const_cast(&index); + } cached_index_chunk_res_ = std::move(func(index_ptr, values...)); cached_index_chunk_id_ = i; } @@ -427,6 +455,9 @@ class SegmentExpr : public Expr { DataType pk_type_; int64_t batch_size_; + std::vector nested_path_; + DataType field_type_; + bool is_index_mode_{false}; bool is_data_mode_{false}; // sometimes need to skip index and using raw data diff --git a/internal/core/src/exec/expression/JsonContainsExpr.h b/internal/core/src/exec/expression/JsonContainsExpr.h index a0cfdfdea0841..cd07cbb5049f9 100644 --- a/internal/core/src/exec/expression/JsonContainsExpr.h +++ b/internal/core/src/exec/expression/JsonContainsExpr.h @@ -40,6 +40,7 @@ class PhyJsonContainsFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { diff --git a/internal/core/src/exec/expression/TermExpr.h b/internal/core/src/exec/expression/TermExpr.h index a816c6c9c6153..3b83b7b022cf3 100644 --- a/internal/core/src/exec/expression/TermExpr.h +++ b/internal/core/src/exec/expression/TermExpr.h @@ -61,6 +61,7 @@ class PhyTermFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr), diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index f53475e14e192..2861fcc235a86 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -16,6 +16,9 @@ #include "UnaryExpr.h" #include "common/Json.h" +#include "common/Types.h" +#include "common/type_c.h" +#include "log/Log.h" namespace milvus { namespace exec { @@ -187,25 +190,49 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { } case DataType::JSON: { auto val_type = expr_->val_.val_case(); - switch (val_type) { - case proto::plan::GenericValue::ValCase::kBoolVal: - result = ExecRangeVisitorImplJson(); - break; - case proto::plan::GenericValue::ValCase::kInt64Val: - result = ExecRangeVisitorImplJson(); - break; - case proto::plan::GenericValue::ValCase::kFloatVal: - result = ExecRangeVisitorImplJson(); - break; - case proto::plan::GenericValue::ValCase::kStringVal: - result = ExecRangeVisitorImplJson(); - break; - case proto::plan::GenericValue::ValCase::kArrayVal: - result = ExecRangeVisitorImplJson(); - break; - default: - PanicInfo( - DataTypeInvalid, "unknown data type: {}", val_type); + if (CanUseIndexForJson()) { + switch (val_type) { + case proto::plan::GenericValue::ValCase::kBoolVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kInt64Val: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kFloatVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kStringVal: + result = ExecRangeVisitorImplForIndex(); + break; + case proto::plan::GenericValue::ValCase::kArrayVal: + result = + ExecRangeVisitorImplForIndex(); + break; + default: + PanicInfo( + DataTypeInvalid, "unknown data type: {}", val_type); + } + } else { + switch (val_type) { + case proto::plan::GenericValue::ValCase::kBoolVal: + result = ExecRangeVisitorImplJson(); + break; + case proto::plan::GenericValue::ValCase::kInt64Val: + result = ExecRangeVisitorImplJson(); + break; + case proto::plan::GenericValue::ValCase::kFloatVal: + result = ExecRangeVisitorImplJson(); + break; + case proto::plan::GenericValue::ValCase::kStringVal: + result = ExecRangeVisitorImplJson(); + break; + case proto::plan::GenericValue::ValCase::kArrayVal: + result = ExecRangeVisitorImplJson(); + break; + default: + PanicInfo( + DataTypeInvalid, "unknown data type: {}", val_type); + } } break; } @@ -364,7 +391,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { // filtering by index, get candidates. auto size_per_chunk = segment_->size_per_chunk(); - auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { + auto retrieve = [size_per_chunk, this](int64_t offset) -> auto { auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = @@ -861,6 +888,13 @@ PhyUnaryRangeFilterExpr::CanUseIndex() { return res; } +bool +PhyUnaryRangeFilterExpr::CanUseIndexForJson() { + use_index_ = + segment_->HasIndex(milvus::Json::pointer(expr_->column_.nested_path_)); + return use_index_; +} + VectorPtr PhyUnaryRangeFilterExpr::ExecTextMatch() { using Index = index::TextMatchIndex; diff --git a/internal/core/src/exec/expression/UnaryExpr.h b/internal/core/src/exec/expression/UnaryExpr.h index 83711f6d70dab..d96907d3825dd 100644 --- a/internal/core/src/exec/expression/UnaryExpr.h +++ b/internal/core/src/exec/expression/UnaryExpr.h @@ -281,6 +281,7 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { name, segment, expr->column_.field_id_, + expr->column_.nested_path_, active_count, batch_size), expr_(expr) { @@ -331,6 +332,9 @@ class PhyUnaryRangeFilterExpr : public SegmentExpr { bool CanUseIndexForArray(); + bool + CanUseIndexForJson(); + VectorPtr ExecTextMatch(); diff --git a/internal/core/src/index/IndexFactory.cpp b/internal/core/src/index/IndexFactory.cpp index b7754c4503824..a1a10755144f5 100644 --- a/internal/core/src/index/IndexFactory.cpp +++ b/internal/core/src/index/IndexFactory.cpp @@ -15,11 +15,15 @@ // limitations under the License. #include "index/IndexFactory.h" +#include +#include #include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" #include "common/Types.h" #include "index/VectorMemIndex.h" #include "index/Utils.h" #include "index/Meta.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" #include "knowhere/utils.h" #include "index/VectorDiskIndex.h" @@ -29,6 +33,8 @@ #include "index/InvertedIndexTantivy.h" #include "index/HybridScalarIndex.h" #include "knowhere/comp/knowhere_check.h" +#include "log/Log.h" +#include "pb/schema.pb.h" namespace milvus::index { @@ -359,6 +365,49 @@ IndexFactory::CreateComplexScalarIndex( PanicInfo(Unsupported, "Complex index not supported now"); } +IndexBasePtr +IndexFactory::CreateJsonIndex( + IndexType index_type, + DataType cast_dtype, + const std::string& nested_path, + const storage::FileManagerContext& file_manager_context) { + AssertInfo(index_type == INVERTED_INDEX_TYPE, + "Invalid index type for json index"); + switch (cast_dtype) { + case DataType::BOOL: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Bool, + nested_path, + file_manager_context); + case milvus::DataType::INT8: + case milvus::DataType::INT16: + case milvus::DataType::INT32: + case DataType::INT64: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Int64, + nested_path, + file_manager_context); + case DataType::FLOAT: + case DataType::DOUBLE: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::Double, + nested_path, + file_manager_context); + case DataType::STRING: + case DataType::VARCHAR: + return std::make_unique< + indexbuilder::JsonInvertedIndexCreator>( + proto::schema::DataType::String, + nested_path, + file_manager_context); + default: + PanicInfo(DataTypeInvalid, "Invalid data type:{}", cast_dtype); + } +} + IndexBasePtr IndexFactory::CreateScalarIndex( const CreateIndexInfo& create_index_info, @@ -381,8 +430,10 @@ IndexFactory::CreateScalarIndex( file_manager_context); } case DataType::JSON: { - return CreateComplexScalarIndex(create_index_info.index_type, - file_manager_context); + return CreateJsonIndex(create_index_info.index_type, + create_index_info.json_cast_type, + create_index_info.json_path, + file_manager_context); } default: PanicInfo(DataTypeInvalid, "Invalid data type:{}", data_type); diff --git a/internal/core/src/index/IndexFactory.h b/internal/core/src/index/IndexFactory.h index b5a6d408ba1d8..f56e70fdfbd0c 100644 --- a/internal/core/src/index/IndexFactory.h +++ b/internal/core/src/index/IndexFactory.h @@ -21,6 +21,7 @@ #include #include +#include "common/Types.h" #include "common/type_c.h" #include "index/Index.h" #include "index/ScalarIndex.h" @@ -103,6 +104,13 @@ class IndexFactory { const storage::FileManagerContext& file_manager_context = storage::FileManagerContext()); + IndexBasePtr + CreateJsonIndex(IndexType index_type, + DataType cast_dtype, + const std::string& nested_path, + const storage::FileManagerContext& file_manager_context = + storage::FileManagerContext()); + IndexBasePtr CreateScalarIndex(const CreateIndexInfo& create_index_info, const storage::FileManagerContext& file_manager_context = diff --git a/internal/core/src/index/IndexInfo.h b/internal/core/src/index/IndexInfo.h index f925de1e4ae99..7768e4c9a5641 100644 --- a/internal/core/src/index/IndexInfo.h +++ b/internal/core/src/index/IndexInfo.h @@ -26,6 +26,8 @@ struct CreateIndexInfo { IndexVersion index_engine_version; std::string field_name; int64_t dim; + DataType json_cast_type; + std::string json_path; }; } // namespace milvus::index diff --git a/internal/core/src/index/InvertedIndexTantivy.cpp b/internal/core/src/index/InvertedIndexTantivy.cpp index 1db1fa9d01bff..5876c5efd06a4 100644 --- a/internal/core/src/index/InvertedIndexTantivy.cpp +++ b/internal/core/src/index/InvertedIndexTantivy.cpp @@ -29,36 +29,6 @@ namespace milvus::index { constexpr const char* TMP_INVERTED_INDEX_PREFIX = "/tmp/milvus/inverted-index/"; -inline TantivyDataType -get_tantivy_data_type(proto::schema::DataType data_type) { - switch (data_type) { - case proto::schema::DataType::Bool: { - return TantivyDataType::Bool; - } - - case proto::schema::DataType::Int8: - case proto::schema::DataType::Int16: - case proto::schema::DataType::Int32: - case proto::schema::DataType::Int64: { - return TantivyDataType::I64; - } - - case proto::schema::DataType::Float: - case proto::schema::DataType::Double: { - return TantivyDataType::F64; - } - - case proto::schema::DataType::String: - case proto::schema::DataType::VarChar: { - return TantivyDataType::Keyword; - } - - default: - PanicInfo(ErrorCode::NotImplemented, - fmt::format("not implemented data type: {}", data_type)); - } -} - inline TantivyDataType get_tantivy_data_type(const proto::schema::FieldSchema& schema) { switch (schema.data_type()) { @@ -289,7 +259,6 @@ template const TargetBitmap InvertedIndexTantivy::Range(T value, OpType op) { TargetBitmap bitset(Count()); - switch (op) { case OpType::LessThan: { auto array = wrapper_->upper_bound_range_query(value, false); @@ -467,6 +436,11 @@ InvertedIndexTantivy::BuildWithFieldData( break; } + case proto::schema::DataType::JSON: { + build_index_for_json(field_datas); + break; + } + default: PanicInfo(ErrorCode::NotImplemented, fmt::format("Inverted index not supported on {}", diff --git a/internal/core/src/index/InvertedIndexTantivy.h b/internal/core/src/index/InvertedIndexTantivy.h index 9d7febfd90942..91fe05029fcff 100644 --- a/internal/core/src/index/InvertedIndexTantivy.h +++ b/internal/core/src/index/InvertedIndexTantivy.h @@ -24,6 +24,36 @@ namespace milvus::index { +inline TantivyDataType +get_tantivy_data_type(proto::schema::DataType data_type) { + switch (data_type) { + case proto::schema::DataType::Bool: { + return TantivyDataType::Bool; + } + + case proto::schema::DataType::Int8: + case proto::schema::DataType::Int16: + case proto::schema::DataType::Int32: + case proto::schema::DataType::Int64: { + return TantivyDataType::I64; + } + + case proto::schema::DataType::Float: + case proto::schema::DataType::Double: { + return TantivyDataType::F64; + } + + case proto::schema::DataType::String: + case proto::schema::DataType::VarChar: { + return TantivyDataType::Keyword; + } + + default: + PanicInfo(ErrorCode::NotImplemented, + fmt::format("not implemented data type: {}", data_type)); + } +} + using TantivyIndexWrapper = milvus::tantivy::TantivyIndexWrapper; using RustArrayWrapper = milvus::tantivy::RustArrayWrapper; @@ -173,10 +203,10 @@ class InvertedIndexTantivy : public ScalarIndex { const TargetBitmap RegexQuery(const std::string& regex_pattern) override; - protected: void BuildWithFieldData(const std::vector& datas) override; + protected: void finish(); @@ -184,6 +214,13 @@ class InvertedIndexTantivy : public ScalarIndex { build_index_for_array( const std::vector>& field_datas); + virtual void + build_index_for_json( + const std::vector>& field_datas) { + PanicInfo(ErrorCode::NotImplemented, + "build_index_for_json not implemented"); + }; + protected: std::shared_ptr wrapper_; TantivyDataType d_type_; diff --git a/internal/core/src/indexbuilder/IndexFactory.h b/internal/core/src/indexbuilder/IndexFactory.h index 6aa0b48302410..a043aafacbfbe 100644 --- a/internal/core/src/indexbuilder/IndexFactory.h +++ b/internal/core/src/indexbuilder/IndexFactory.h @@ -18,6 +18,7 @@ #include "common/EasyAssert.h" #include "indexbuilder/IndexCreatorBase.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" #include "indexbuilder/ScalarIndexCreator.h" #include "indexbuilder/VecIndexCreator.h" #include "indexbuilder/type_c.h" @@ -60,6 +61,7 @@ class IndexFactory { case DataType::VARCHAR: case DataType::STRING: case DataType::ARRAY: + case DataType::JSON: return CreateScalarIndex(type, config, context); case DataType::VECTOR_FLOAT: @@ -68,6 +70,7 @@ class IndexFactory { case DataType::VECTOR_BINARY: case DataType::VECTOR_SPARSE_FLOAT: return std::make_unique(type, config, context); + default: PanicInfo(DataTypeInvalid, fmt::format("invalid type is {}", invalid_dtype_msg)); diff --git a/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp new file mode 100644 index 0000000000000..4dc143d6c62e1 --- /dev/null +++ b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.cpp @@ -0,0 +1,56 @@ +#include "indexbuilder/JsonInvertedIndexCreator.h" +#include +#include +#include +#include "common/EasyAssert.h" +#include "common/FieldDataInterface.h" +#include "common/Json.h" +#include "common/Types.h" +#include "log/Log.h" +#include "simdjson/error.h" + +namespace milvus::indexbuilder { + +template +void +JsonInvertedIndexCreator::build_index_for_json( + const std::vector>& field_datas) { + using GetType = + std::conditional_t, std::string_view, T>; + int64_t offset = 0; + LOG_INFO("Start to build json inverted index for field: {}", nested_path_); + for (const auto& data : field_datas) { + auto n = data->get_num_rows(); + for (int64_t i = 0; i < n; i++) { + auto json_column = static_cast(data->RawValue(i)); + if (this->schema_.nullable() && !data->is_valid(i)) { + this->null_offset.push_back(i); + continue; + } + value_result res = json_column->at(nested_path_); + auto err = res.error(); + if (err != simdjson::SUCCESS) { + AssertInfo(err == simdjson::INCORRECT_TYPE || + err == simdjson::NO_SUCH_FIELD, + "Failed to parse json, err: {}", + err); + offset++; + continue; + } + if constexpr (std::is_same_v) { + auto value = std::string(res.value()); + this->wrapper_->template add_data(&value, 1, offset++); + } else { + auto value = res.value(); + this->wrapper_->template add_data(&value, 1, offset++); + } + } + } +} + +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; +template class JsonInvertedIndexCreator; + +} // namespace milvus::indexbuilder \ No newline at end of file diff --git a/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h new file mode 100644 index 0000000000000..bc4024ba67cd7 --- /dev/null +++ b/internal/core/src/indexbuilder/JsonInvertedIndexCreator.h @@ -0,0 +1,53 @@ +#pragma once +#include "common/FieldDataInterface.h" +#include "index/InvertedIndexTantivy.h" +#include "storage/FileManager.h" +#include "boost/filesystem.hpp" +#include "tantivy-binding.h" + +namespace milvus::indexbuilder { + +template +class JsonInvertedIndexCreator : public index::InvertedIndexTantivy { + public: + JsonInvertedIndexCreator(const proto::schema::DataType cast_type, + const std::string& nested_path, + const storage::FileManagerContext& ctx) + : nested_path_(nested_path) { + this->schema_ = ctx.fieldDataMeta.field_schema; + this->mem_file_manager_ = + std::make_shared(ctx); + this->disk_file_manager_ = + std::make_shared(ctx); + + auto prefix = this->disk_file_manager_->GetTextIndexIdentifier(); + constexpr const char* TMP_INVERTED_INDEX_PREFIX = + "/tmp/milvus/inverted-index/"; + this->path_ = std::string(TMP_INVERTED_INDEX_PREFIX) + prefix; + + this->d_type_ = index::get_tantivy_data_type(cast_type); + boost::filesystem::create_directories(this->path_); + std::string field_name = std::to_string( + this->disk_file_manager_->GetFieldDataMeta().field_id); + this->wrapper_ = std::make_shared( + field_name.c_str(), this->d_type_, this->path_.c_str()); + } + + void + build_index_for_json(const std::vector>& + field_datas) override; + + void + finish() { + this->wrapper_->finish(); + } + + void + create_reader() { + this->wrapper_->create_reader(); + } + + private: + std::string nested_path_; +}; +} // namespace milvus::indexbuilder \ No newline at end of file diff --git a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp index 855be1476017f..ccfa886b10c6f 100644 --- a/internal/core/src/indexbuilder/ScalarIndexCreator.cpp +++ b/internal/core/src/indexbuilder/ScalarIndexCreator.cpp @@ -10,6 +10,8 @@ // or implied. See the License for the specific language governing permissions and limitations under the License #include "indexbuilder/ScalarIndexCreator.h" +#include "common/FieldDataInterface.h" +#include "common/Types.h" #include "index/IndexFactory.h" #include "index/IndexInfo.h" #include "index/Meta.h" @@ -32,6 +34,11 @@ ScalarIndexCreator::ScalarIndexCreator( } index_info.field_type = dtype_; index_info.index_type = index_type(); + if (dtype == DataType::JSON) { + index_info.json_cast_type = static_cast( + std::stoi(config.at("json_cast_type").get())); + index_info.json_path = config.at("json_path").get(); + } index_ = index::IndexFactory::GetInstance().CreateIndex( index_info, file_manager_context); } diff --git a/internal/core/src/segcore/SegmentInterface.h b/internal/core/src/segcore/SegmentInterface.h index 2d4e02d2f5fd4..91c2d539ee3d6 100644 --- a/internal/core/src/segcore/SegmentInterface.h +++ b/internal/core/src/segcore/SegmentInterface.h @@ -21,6 +21,7 @@ #include "DeletedRecord.h" #include "FieldIndexing.h" +#include "common/EasyAssert.h" #include "common/Schema.h" #include "common/Span.h" #include "common/SystemProperty.h" @@ -201,6 +202,16 @@ class SegmentInternalInterface : public SegmentInterface { return *ptr; } + template + const index::ScalarIndex& + chunk_scalar_index(std::string path, int64_t chunk_id) const { + using IndexType = index::ScalarIndex; + auto base_ptr = chunk_index_impl(path, chunk_id); + auto ptr = dynamic_cast(base_ptr); + AssertInfo(ptr, "entry mismatch"); + return *ptr; + } + std::unique_ptr Search(const query::Plan* Plan, const query::PlaceholderGroup* placeholder_group, @@ -230,6 +241,10 @@ class SegmentInternalInterface : public SegmentInterface { virtual bool HasIndex(FieldId field_id) const = 0; + virtual bool + HasIndex(const std::string& nested_path) const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + }; virtual bool HasFieldData(FieldId field_id) const = 0; @@ -385,6 +400,11 @@ class SegmentInternalInterface : public SegmentInterface { virtual const index::IndexBase* chunk_index_impl(FieldId field_id, int64_t chunk_id) const = 0; + virtual const index::IndexBase* + chunk_index_impl(std::string path, int64_t chunk_id) const { + PanicInfo(ErrorCode::NotImplemented, "not implemented"); + }; + // calculate output[i] = Vec[seg_offsets[i]}, where Vec binds to system_type virtual void bulk_subscript(SystemFieldType system_type, diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index de7643751f73b..92232f00e151a 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -170,13 +170,20 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { auto field_id = FieldId(info.field_id); auto& field_meta = schema_->operator[](field_id); - auto row_count = info.index->Count(); - AssertInfo(row_count > 0, "Index count is 0"); - std::unique_lock lck(mutex_); AssertInfo( !get_bit(index_ready_bitset_, field_id), "scalar index has been exist at " + std::to_string(field_id.get())); + + if (field_meta.get_data_type() == DataType::JSON) { + auto path = info.index_params.at("json_path"); + json_indexings_[path] = + std::move(const_cast(info).index); + return; + } + auto row_count = info.index->Count(); + AssertInfo(row_count > 0, "Index count is 0"); + if (num_rows_.has_value()) { AssertInfo(num_rows_.value() == row_count, "field (" + std::to_string(field_id.get()) + @@ -185,7 +192,6 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { ") than other column's row count (" + std::to_string(num_rows_.value()) + ")"); } - scalar_indexings_[field_id] = std::move(const_cast(info).index); // reverse pk from scalar index and set pks to offset @@ -744,6 +750,13 @@ SegmentSealedImpl::chunk_index_impl(FieldId field_id, int64_t chunk_id) const { return ptr; } +const index::IndexBase* +SegmentSealedImpl::chunk_index_impl(std::string path, int64_t chunk_id) const { + AssertInfo(json_indexings_.find(path) != json_indexings_.end(), + "Cannot find json index with path: " + path); + return json_indexings_.at(path).get(); +} + int64_t SegmentSealedImpl::get_row_count() const { std::shared_lock lck(mutex_); @@ -1685,6 +1698,11 @@ SegmentSealedImpl::HasIndex(FieldId field_id) const { get_bit(binlog_index_bitset_, field_id); } +bool +SegmentSealedImpl::HasIndex(const std::string& path) const { + return json_indexings_.find(path) != json_indexings_.end(); +} + bool SegmentSealedImpl::HasFieldData(FieldId field_id) const { std::shared_lock lck(mutex_); @@ -1710,6 +1728,8 @@ SegmentSealedImpl::HasRawData(int64_t field_id) const { field_indexing->indexing_.get()); return vec_index->HasRawData(); } + } else if (IsJsonDataType(field_meta.get_data_type())) { + return false; } else { auto scalar_index = scalar_indexings_.find(fieldID); if (scalar_index != scalar_indexings_.end()) { diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 4e44a57e472f5..d58e3e409e7ad 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -62,6 +62,9 @@ class SegmentSealedImpl : public SegmentSealed { DropFieldData(const FieldId field_id) override; bool HasIndex(FieldId field_id) const override; + + bool + HasIndex(const std::string& nested_index) const override; bool HasFieldData(FieldId field_id) const override; @@ -202,6 +205,9 @@ class SegmentSealedImpl : public SegmentSealed { const index::IndexBase* chunk_index_impl(FieldId field_id, int64_t chunk_id) const override; + const index::IndexBase* + chunk_index_impl(std::string path, int64_t chunk_id) const override; + // Calculate: output[i] = Vec[seg_offset[i]], // where Vec is determined from field_offset void @@ -365,6 +371,8 @@ class SegmentSealedImpl : public SegmentSealed { // whether the segment is sorted by the pk bool is_sorted_by_pk_ = false; + + std::unordered_map json_indexings_; }; inline SegmentSealedUPtr diff --git a/internal/core/src/segcore/load_index_c.cpp b/internal/core/src/segcore/load_index_c.cpp index b63771392f5f7..c3b35c7344ebc 100644 --- a/internal/core/src/segcore/load_index_c.cpp +++ b/internal/core/src/segcore/load_index_c.cpp @@ -303,6 +303,11 @@ AppendIndexV2(CTraceContext c_trace, CLoadIndexInfo c_load_index_info) { load_index_info->index_params); config[milvus::index::INDEX_FILES] = load_index_info->index_files; + if (load_index_info->field_type == milvus::DataType::JSON) { + index_info.json_cast_type = static_cast( + std::stoi(config.at("json_cast_type").get())); + index_info.json_path = config.at("json_path").get(); + } milvus::storage::FileManagerContext fileManagerContext( field_meta, index_meta, remote_chunk_manager); load_index_info->index = diff --git a/internal/core/thirdparty/tantivy/tantivy-wrapper.h b/internal/core/thirdparty/tantivy/tantivy-wrapper.h index 17822d1bbdfb3..28915121c6d06 100644 --- a/internal/core/thirdparty/tantivy/tantivy-wrapper.h +++ b/internal/core/thirdparty/tantivy/tantivy-wrapper.h @@ -4,7 +4,9 @@ #include #include #include +#include +#include "log/Log.h" #include "tantivy-binding.h" #include "rust-binding.h" #include "rust-array.h" diff --git a/internal/core/unittest/test_expr.cpp b/internal/core/unittest/test_expr.cpp index 2bfc4646d10af..33f0f7d96dea4 100644 --- a/internal/core/unittest/test_expr.cpp +++ b/internal/core/unittest/test_expr.cpp @@ -19,9 +19,15 @@ #include #include +#include "common/FieldDataInterface.h" #include "common/Json.h" +#include "common/LoadInfo.h" #include "common/Types.h" +#include "indexbuilder/JsonInvertedIndexCreator.h" +#include "knowhere/comp/index_param.h" +#include "mmap/Types.h" #include "pb/plan.pb.h" +#include "pb/schema.pb.h" #include "query/Plan.h" #include "query/PlanNode.h" #include "query/PlanProto.h" @@ -29,6 +35,7 @@ #include "segcore/SegmentGrowingImpl.h" #include "simdjson/padded_string.h" #include "segcore/segment_c.h" +#include "storage/FileManager.h" #include "test_utils/DataGen.h" #include "index/IndexFactory.h" #include "exec/expression/Expr.h" @@ -6621,3 +6628,56 @@ TEST_P(ExprTest, TestJsonContainsDiffType) { } } } + +TEST(JsonInvertedIndex, UnaryExpr) { + auto schema = std::make_shared(); + auto vec_fid = schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + auto i32_fid = schema->AddDebugField("age32", DataType::INT32); + auto i64_fid = schema->AddDebugField("age64", DataType::INT64); + auto json_fid = schema->AddDebugField("json", DataType::JSON); + schema->set_primary_field_id(i64_fid); + + auto seg = CreateSealedSegment(schema); + int N = 1000; + auto raw_data = DataGen(schema, N); + segcore::LoadIndexInfo load_index_info; + + auto file_manager_ctx = storage::FileManagerContext(); + file_manager_ctx.fieldDataMeta.field_schema.set_data_type( + proto::schema::Int64); + file_manager_ctx.fieldDataMeta.field_schema.set_fieldid(json_fid.get()); + auto json_index = std::make_unique< + milvus::indexbuilder::JsonInvertedIndexCreator>( + proto::schema::Int64, "/json/int", file_manager_ctx); + auto json_col = raw_data.get_col(json_fid); + auto json_field = + std::make_shared>(DataType::JSON, false); + std::vector jsons; + + for (auto& json : json_col) { + jsons.push_back(milvus::Json(simdjson::padded_string(json))); + } + json_field->add_json_data(jsons); + + json_index->BuildWithFieldData({json_field}); + json_index->finish(); + json_index->create_reader(); + + load_index_info.field_id = json_fid.get(); + load_index_info.field_type = DataType::JSON; + load_index_info.index = std::move(json_index); + load_index_info.index_params = {{"json_path", "/json/int"}}; + seg->LoadIndex(load_index_info); + + proto::plan::GenericValue value; + value.set_int64_val(1); + auto unary_expr = std::make_shared( + expr::ColumnInfo(json_fid, DataType::JSON, {"json", "int"}), + proto::plan::OpType::LessThan, + value); + auto plan = + std::make_shared(DEFAULT_PLANNODE_ID, unary_expr); + auto final = ExecuteQueryExpr(plan, seg.get(), N, MAX_TIMESTAMP); + EXPECT_EQ(final.size(), N); +} \ No newline at end of file diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 4e5f3d194efe7..f36c87060b3b6 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -281,6 +281,9 @@ func (m *indexMeta) CanCreateIndex(req *indexpb.CreateIndexRequest) (UniqueID, e return 0, fmt.Errorf("CreateIndex failed: %s", errMsg) } if req.FieldID == index.FieldID { + if index.IsJson { + continue + } // creating multiple indexes on same field is not supported errMsg := "CreateIndex failed: creating multiple indexes on same field is not supported" log.Warn(errMsg) @@ -979,6 +982,7 @@ func (m *indexMeta) AreAllDiskIndex(collectionID int64, schema *schemapb.Collect return t.FieldID, GetIndexType(t.IndexParams) }) vectorFieldsWithDiskIndex := lo.Filter(vectorFields, func(field *schemapb.FieldSchema, _ int) bool { + if indexType, ok := fieldIndexTypes[field.FieldID]; ok { return indexparamcheck.IsDiskIndex(indexType) } diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 9c420bca91495..9c9f4887f3d7d 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -19,12 +19,15 @@ package datacoord import ( "context" "fmt" + "strconv" + "strings" "time" "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -52,12 +55,12 @@ func (s *Server) startIndexService(ctx context.Context) { go s.createIndexForSegmentLoop(ctx) } -func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) error { +func (s *Server) createIndexForSegment(segment *SegmentInfo, index *model.Index) error { if !segment.GetIsSorted() && Params.DataCoordCfg.EnableStatsTask.GetAsBool() && !segment.GetIsImporting() && segment.Level != datapb.SegmentLevel_L0 { log.Info("segment not sorted, skip create index", zap.Int64("segmentID", segment.GetID())) return nil } - log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", indexID)) + log.Info("create index for segment", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", index.IndexID)) buildID, err := s.allocator.AllocID(context.Background()) if err != nil { return err @@ -67,10 +70,11 @@ func (s *Server) createIndexForSegment(segment *SegmentInfo, indexID UniqueID) e CollectionID: segment.CollectionID, PartitionID: segment.PartitionID, NumRows: segment.NumOfRows, - IndexID: indexID, + IndexID: index.IndexID, BuildID: buildID, CreateTime: uint64(segment.ID), WriteHandoff: false, + IsJson: index.IsJson, } if err = s.meta.indexMeta.AddSegmentIndex(segIndex); err != nil { return err @@ -93,7 +97,7 @@ func (s *Server) createIndexesForSegment(segment *SegmentInfo) error { indexIDToSegIndexes := s.meta.indexMeta.GetSegmentIndexes(segment.CollectionID, segment.ID) for _, index := range indexes { if _, ok := indexIDToSegIndexes[index.IndexID]; !ok { - if err := s.createIndexForSegment(segment, index.IndexID); err != nil { + if err := s.createIndexForSegment(segment, index); err != nil { log.Warn("create index for segment fail", zap.Int64("segmentID", segment.ID), zap.Int64("indexID", index.IndexID)) return err @@ -162,18 +166,157 @@ func (s *Server) createIndexForSegmentLoop(ctx context.Context) { } } -func (s *Server) getFieldNameByID(ctx context.Context, collID, fieldID int64) (string, error) { +func (s *Server) getFieldNameByID(schema *schemapb.CollectionSchema, collID, fieldID int64) (string, error) { + for _, field := range schema.GetFields() { + if field.FieldID == fieldID { + return field.Name, nil + } + } + return "", nil +} + +func (s *Server) getSchema(ctx context.Context, collID int64) (*schemapb.CollectionSchema, error) { resp, err := s.broker.DescribeCollectionInternal(ctx, collID) if err != nil { - return "", err + return nil, err } + return resp.GetSchema(), nil +} - for _, field := range resp.GetSchema().GetFields() { - if field.FieldID == fieldID { - return field.Name, nil +func (s *Server) isJsonField(schema *schemapb.CollectionSchema, collID, fieldID int64) (bool, error) { + for _, f := range schema.Fields { + if f.FieldID == fieldID { + return typeutil.IsJSONType(f.DataType), nil } } - return "", nil + return false, nil +} + +const JSON_PATH_KEY = "json_path" +const JSON_CAST_KEY = "json_cast_type" + +func getIndexParam(indexParams []*commonpb.KeyValuePair, key string) (string, error) { + for _, p := range indexParams { + if p.Key == JSON_PATH_KEY { + return p.Value, nil + } + } + return "", merr.WrapErrParameterInvalidMsg("%s not found", key) +} + +func setIndexParam(indexParams []*commonpb.KeyValuePair, key, value string) { + for _, p := range indexParams { + if p.Key == key { + p.Value = value + } + } +} + +func (s *Server) checkJsonParams(req *indexpb.CreateIndexRequest) error { + var jsonPath = "" + var jsonCastType = schemapb.DataType_None + for _, p := range req.IndexParams { + if p.Key == JSON_PATH_KEY { + jsonPath = p.Value + } else if p.Key == JSON_CAST_KEY { + v, err := strconv.ParseInt(p.Value, 10, 64) + if err != nil { + return err + } + jsonCastType = schemapb.DataType(v) + } + } + + if jsonPath == "" { + return merr.WrapErrParameterInvalidMsg("%s is empty", JSON_PATH_KEY) + } + if jsonCastType == schemapb.DataType_None { + return merr.WrapErrParameterInvalidMsg("%s is empty", JSON_CAST_KEY) + } + + // TODO: sunby, check json path + // check json cast type + switch jsonCastType { + case schemapb.DataType_Bool, schemapb.DataType_Int8, schemapb.DataType_Int16, schemapb.DataType_Int32, schemapb.DataType_Int64, schemapb.DataType_Float, schemapb.DataType_Double, + schemapb.DataType_String, schemapb.DataType_VarChar: + default: + return merr.WrapErrParameterInvalidMsg("invalid json cast type %s", jsonCastType.String()) + } + + // check duplicate json path + exists := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) + if len(exists) > 0 { + if len(exists) > 1 { + log.Warn("there're more than one index with the same name", zap.String("indexName", req.GetIndexName())) + } + existsJsonPath, err := getIndexParam(exists[0].IndexParams, JSON_PATH_KEY) + if err != nil { + return err + } + if existsJsonPath != jsonPath { + return merr.WrapErrParameterInvalidMsg("can not create duplicated indexes with different json path, prev: %v", existsJsonPath) + } + existsJsonCastType, err := getIndexParam(exists[0].IndexParams, JSON_CAST_KEY) + if err != nil { + return err + } + existsType, err := strconv.ParseInt(existsJsonCastType, 10, 64) + if err != nil { + return err + } + if schemapb.DataType(existsType) != jsonCastType { + return merr.WrapErrParameterInvalidMsg("can not create duplicated indexes with different json cast type, prev: %v", schemapb.DataType(existsType).String()) + } + } + return nil +} + +func (s *Server) parseNestedPath(identifier string, schema *schemapb.CollectionSchema) (string, error) { + fieldName := strings.Split(identifier, "[")[0] + nestedPath := make([]string, 0) + helper, err := typeutil.CreateSchemaHelper(schema) + if err != nil { + return "", err + } + field, err := helper.GetFieldFromNameDefaultJSON(fieldName) + if err != nil { + return "", err + } + if field.GetDataType() != schemapb.DataType_JSON && + field.GetDataType() != schemapb.DataType_Array { + errMsg := fmt.Sprintf("%s data type not supported accessed with []", field.GetDataType()) + return "", fmt.Errorf(errMsg) + } + if fieldName != field.Name { + r := strings.ReplaceAll(fieldName, "~", "~0") + r = strings.ReplaceAll(r, "/", "~1") + nestedPath = append(nestedPath, r) + } + jsonKeyStr := identifier[len(fieldName):] + ss := strings.Split(jsonKeyStr, "][") + for i := 0; i < len(ss); i++ { + path := strings.Trim(ss[i], "[]") + if path == "" { + return "", fmt.Errorf("invalid identifier: %s", identifier) + } + if (strings.HasPrefix(path, "\"") && strings.HasSuffix(path, "\"")) || + (strings.HasPrefix(path, "'") && strings.HasSuffix(path, "'")) { + path = path[1 : len(path)-1] + if path == "" { + return "", fmt.Errorf("invalid identifier: %s", identifier) + } + if typeutil.IsArrayType(field.DataType) { + return "", fmt.Errorf("can only access array field with integer index") + } + } else if _, err := strconv.ParseInt(path, 10, 64); err != nil { + return "", fmt.Errorf("json key must be enclosed in double quotes or single quotes: \"%s\"", path) + } + r := strings.ReplaceAll(path, "~", "~0") + r = strings.ReplaceAll(r, "/", "~1") + nestedPath = append(nestedPath, r) + } + + return "/" + strings.Join(nestedPath, "/"), nil } // CreateIndex create an index on collection. @@ -197,17 +340,45 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques } metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc() + schema, err := s.getSchema(ctx, req.GetCollectionID()) + if err != nil { + return merr.Status(err), nil + } + isJson, err := s.isJsonField(schema, req.GetCollectionID(), req.GetFieldID()) + if err != nil { + return merr.Status(err), nil + } + if req.GetIndexName() == "" { - indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) - if len(indexes) == 0 { - fieldName, err := s.getFieldNameByID(ctx, req.GetCollectionID(), req.GetFieldID()) + if isJson { + jsonPath, err := getIndexParam(req.GetIndexParams(), JSON_PATH_KEY) if err != nil { - log.Warn("get field name from schema failed", zap.Int64("fieldID", req.GetFieldID())) return merr.Status(err), nil } - req.IndexName = fieldName - } else if len(indexes) == 1 { - req.IndexName = indexes[0].IndexName + nestedPath, err := s.parseNestedPath(jsonPath, schema) + if err != nil { + return merr.Status(err), nil + } + setIndexParam(req.GetIndexParams(), JSON_PATH_KEY, nestedPath) + req.IndexName = jsonPath + } else { + indexes := s.meta.indexMeta.GetFieldIndexes(req.GetCollectionID(), req.GetFieldID(), req.GetIndexName()) + if len(indexes) == 0 { + fieldName, err := s.getFieldNameByID(schema, req.GetCollectionID(), req.GetFieldID()) + if err != nil { + log.Warn("get field name from schema failed", zap.Int64("fieldID", req.GetFieldID())) + return merr.Status(err), nil + } + req.IndexName = fieldName + } else if len(indexes) == 1 { + req.IndexName = indexes[0].IndexName + } + } + } + + if isJson { + if err := s.checkJsonParams(req); err != nil { + return merr.Status(err), nil } } @@ -250,6 +421,7 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques CreateTime: req.GetTimestamp(), IsAutoIndex: req.GetIsAutoIndex(), UserIndexParams: req.GetUserIndexParams(), + IsJson: isJson, } if err := ValidateIndexParams(index); err != nil { @@ -890,6 +1062,7 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq IndexVersion: segIdx.IndexVersion, NumRows: segIdx.NumRows, CurrentIndexVersion: segIdx.CurrentIndexVersion, + IsJson: segIdx.IsJson, }) } } diff --git a/internal/metastore/model/index.go b/internal/metastore/model/index.go index 1e44ebeb0db86..1dde7137ca3ac 100644 --- a/internal/metastore/model/index.go +++ b/internal/metastore/model/index.go @@ -19,6 +19,7 @@ type Index struct { IndexParams []*commonpb.KeyValuePair IsAutoIndex bool UserIndexParams []*commonpb.KeyValuePair + IsJson bool } func UnmarshalIndexModel(indexInfo *indexpb.FieldIndex) *Index { @@ -37,6 +38,7 @@ func UnmarshalIndexModel(indexInfo *indexpb.FieldIndex) *Index { IndexParams: indexInfo.IndexInfo.GetIndexParams(), IsAutoIndex: indexInfo.IndexInfo.GetIsAutoIndex(), UserIndexParams: indexInfo.IndexInfo.GetUserIndexParams(), + IsJson: indexInfo.IndexInfo.IsJson, } } @@ -55,6 +57,7 @@ func MarshalIndexModel(index *Index) *indexpb.FieldIndex { IndexParams: index.IndexParams, IsAutoIndex: index.IsAutoIndex, UserIndexParams: index.UserIndexParams, + IsJson: index.IsJson, }, Deleted: index.IsDeleted, CreateTime: index.CreateTime, @@ -124,6 +127,7 @@ func CloneIndex(index *Index) *Index { IndexParams: make([]*commonpb.KeyValuePair, len(index.IndexParams)), IsAutoIndex: index.IsAutoIndex, UserIndexParams: make([]*commonpb.KeyValuePair, len(index.UserIndexParams)), + IsJson: index.IsJson, } for i, param := range index.TypeParams { clonedIndex.TypeParams[i] = proto.Clone(param).(*commonpb.KeyValuePair) diff --git a/internal/metastore/model/segment_index.go b/internal/metastore/model/segment_index.go index 1c727b553d642..e528d5d4ede4c 100644 --- a/internal/metastore/model/segment_index.go +++ b/internal/metastore/model/segment_index.go @@ -21,6 +21,7 @@ type SegmentIndex struct { CreateTime uint64 IndexFileKeys []string IndexSize uint64 + IsJson bool // deprecated WriteHandoff bool CurrentIndexVersion int32 @@ -49,6 +50,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex { IndexSize: segIndex.SerializeSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.GetCurrentIndexVersion(), + IsJson: segIndex.IsJson, } } @@ -74,6 +76,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex { SerializeSize: segIdx.IndexSize, WriteHandoff: segIdx.WriteHandoff, CurrentIndexVersion: segIdx.CurrentIndexVersion, + IsJson: segIdx.IsJson, } } @@ -95,5 +98,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex { IndexSize: segIndex.IndexSize, WriteHandoff: segIndex.WriteHandoff, CurrentIndexVersion: segIndex.CurrentIndexVersion, + IsJson: segIndex.IsJson, } } diff --git a/internal/proto/index_coord.proto b/internal/proto/index_coord.proto index 7377954ebaf12..6b315c2023123 100644 --- a/internal/proto/index_coord.proto +++ b/internal/proto/index_coord.proto @@ -54,6 +54,7 @@ message IndexInfo { bool is_auto_index = 11; repeated common.KeyValuePair user_index_params = 12; int64 pending_index_rows = 13; + bool is_json = 14; } message FieldIndex { @@ -80,6 +81,7 @@ message SegmentIndex { bool write_handoff = 15; int32 current_index_version = 16; int64 index_store_version = 17; + bool is_json = 18; } message RegisterNodeRequest { @@ -158,6 +160,7 @@ message IndexFilePathInfo { int64 index_version = 9; int64 num_rows = 10; int32 current_index_version = 11; + bool is_json = 12; } message SegmentInfo { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 2de85755c7b94..6e9c8fbc5e665 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -383,6 +383,7 @@ message FieldIndexInfo { int64 num_rows = 10; int32 current_index_version = 11; int64 index_store_version = 12; + bool is_json = 13; } enum LoadScope { @@ -638,6 +639,7 @@ message SegmentVersionInfo { map index_info = 7; data.SegmentLevel level = 8; bool is_sorted = 9; + map json_index = 10; } message ChannelVersionInfo { diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go index 4297df396a26f..40af1ba4b15a0 100644 --- a/internal/querycoordv2/checkers/index_checker.go +++ b/internal/querycoordv2/checkers/index_checker.go @@ -158,8 +158,15 @@ func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collec func (c *IndexChecker) checkSegment(segment *meta.Segment, indexInfos []*indexpb.IndexInfo) (fieldIDs []int64) { var result []int64 for _, indexInfo := range indexInfos { - fieldID, indexID := indexInfo.FieldID, indexInfo.IndexID - info, ok := segment.IndexInfo[fieldID] + fieldID, indexID, isJson := indexInfo.FieldID, indexInfo.IndexID, indexInfo.IsJson + // check json index first + var info *querypb.FieldIndexInfo + var ok bool + if isJson { + info, ok = segment.JsonIndexInfo[indexID] + } else { + info, ok = segment.IndexInfo[fieldID] + } if !ok { result = append(result, fieldID) continue diff --git a/internal/querycoordv2/dist/dist_handler.go b/internal/querycoordv2/dist/dist_handler.go index 9bc9f6a228618..6c9de1ebb0840 100644 --- a/internal/querycoordv2/dist/dist_handler.go +++ b/internal/querycoordv2/dist/dist_handler.go @@ -155,6 +155,7 @@ func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistribut Version: s.GetVersion(), LastDeltaTimestamp: s.GetLastDeltaTimestamp(), IndexInfo: s.GetIndexInfo(), + JsonIndexInfo: s.GetJsonIndex(), }) } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index f6e6d3fba0c0c..5c8dae9149e10 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -362,6 +362,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID IndexVersion: info.GetIndexVersion(), NumRows: info.GetNumRows(), CurrentIndexVersion: info.GetCurrentIndexVersion(), + IsJson: info.GetIsJson(), }) } diff --git a/internal/querycoordv2/meta/segment_dist_manager.go b/internal/querycoordv2/meta/segment_dist_manager.go index 51d38fc0fcafe..f48fe64100b66 100644 --- a/internal/querycoordv2/meta/segment_dist_manager.go +++ b/internal/querycoordv2/meta/segment_dist_manager.go @@ -122,6 +122,7 @@ type Segment struct { Version int64 // Version is the timestamp of loading segment LastDeltaTimestamp uint64 // The timestamp of the last delta record IndexInfo map[int64]*querypb.FieldIndexInfo // index info of loaded segment + JsonIndexInfo map[int64]*querypb.FieldIndexInfo // indexID -> FieldIndexInfo } func SegmentFromInfo(info *datapb.SegmentInfo) *Segment { diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index cce217f64482a..46a9fcb841f49 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -259,6 +259,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + jsonIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] } func NewSegment(ctx context.Context, @@ -323,6 +324,7 @@ func NewSegment(ctx context.Context, lastDeltaTimestamp: atomic.NewUint64(0), fields: typeutil.NewConcurrentMap[int64, *FieldInfo](), fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), + jsonIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), memSize: atomic.NewInt64(-1), rowNum: atomic.NewInt64(-1), @@ -441,6 +443,11 @@ func (s *LocalSegment) GetIndex(fieldID int64) *IndexedFieldInfo { return info } +func (s *LocalSegment) GetJsonIndex(indexID int64) *IndexedFieldInfo { + info, _ := s.jsonIndexes.Get(indexID) + return info +} + func (s *LocalSegment) ExistIndex(fieldID int64) bool { fieldInfo, ok := s.fieldIndexes.Get(fieldID) if !ok { @@ -468,6 +475,15 @@ func (s *LocalSegment) Indexes() []*IndexedFieldInfo { return result } +func (s *LocalSegment) JsonIndexes() []*IndexedFieldInfo { + var result []*IndexedFieldInfo + s.jsonIndexes.Range(func(key int64, value *IndexedFieldInfo) bool { + result = append(result, value) + return true + }) + return result +} + func (s *LocalSegment) ResetIndexesLazyLoad(lazyState bool) { for _, indexInfo := range s.Indexes() { indexInfo.IsLoaded = lazyState @@ -1143,7 +1159,12 @@ func (s *LocalSegment) LoadIndex(ctx context.Context, indexInfo *querypb.FieldIn zap.Int64("indexID", indexInfo.GetIndexID()), ) - old := s.GetIndex(indexInfo.GetFieldID()) + var old *IndexedFieldInfo + if indexInfo.IsJson { + old = s.GetJsonIndex(indexInfo.GetIndexID()) + } else { + old = s.GetIndex(indexInfo.GetFieldID()) + } // the index loaded if old != nil && old.IndexInfo.GetIndexID() == indexInfo.GetIndexID() && old.IsLoaded { log.Warn("index already loaded") @@ -1277,13 +1298,23 @@ func (s *LocalSegment) UpdateIndexInfo(ctx context.Context, indexInfo *querypb.F return err } - s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{ - FieldBinlog: &datapb.FieldBinlog{ - FieldID: indexInfo.GetFieldID(), - }, - IndexInfo: indexInfo, - IsLoaded: true, - }) + if indexInfo.IsJson { + s.jsonIndexes.Insert(indexInfo.IndexID, &IndexedFieldInfo{ + FieldBinlog: &datapb.FieldBinlog{ + FieldID: indexInfo.FieldID, + }, + IndexInfo: indexInfo, + IsLoaded: true, + }) + } else { + s.fieldIndexes.Insert(indexInfo.GetFieldID(), &IndexedFieldInfo{ + FieldBinlog: &datapb.FieldBinlog{ + FieldID: indexInfo.GetFieldID(), + }, + IndexInfo: indexInfo, + IsLoaded: true, + }) + } log.Info("updateSegmentIndex done") return nil } diff --git a/internal/querynodev2/segments/segment_interface.go b/internal/querynodev2/segments/segment_interface.go index 9ad7ef219a0fa..85432ed57c035 100644 --- a/internal/querynodev2/segments/segment_interface.go +++ b/internal/querynodev2/segments/segment_interface.go @@ -73,6 +73,7 @@ type Segment interface { GetIndex(fieldID int64) *IndexedFieldInfo ExistIndex(fieldID int64) bool Indexes() []*IndexedFieldInfo + JsonIndexes() []*IndexedFieldInfo HasRawData(fieldID int64) bool // Modification related diff --git a/internal/querynodev2/segments/segment_l0.go b/internal/querynodev2/segments/segment_l0.go index 5119e64c66119..f8dd7949a5d8b 100644 --- a/internal/querynodev2/segments/segment_l0.go +++ b/internal/querynodev2/segments/segment_l0.go @@ -120,6 +120,10 @@ func (s *L0Segment) Indexes() []*IndexedFieldInfo { return nil } +func (s *L0Segment) JsonIndexes() []*IndexedFieldInfo { + return nil +} + func (s *L0Segment) ResetIndexesLazyLoad(lazyState bool) { } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 16b7f7548a12f..669e4c8eb677c 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -635,12 +635,18 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll []*datapb.FieldBinlog, // fields info map[int64]*datapb.TextIndexStats, // text indexed info map[int64]struct{}, // unindexed text fields + []*querypb.FieldIndexInfo, // json indexes ) { fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) + jsonIndexes := make([]*querypb.FieldIndexInfo, 0) for _, indexInfo := range loadInfo.IndexInfos { if len(indexInfo.GetIndexFilePaths()) > 0 { - fieldID := indexInfo.FieldID - fieldID2IndexInfo[fieldID] = indexInfo + if indexInfo.IsJson { + jsonIndexes = append(jsonIndexes, indexInfo) + } else { + fieldID := indexInfo.FieldID + fieldID2IndexInfo[fieldID] = indexInfo + } } } @@ -680,7 +686,7 @@ func separateLoadInfoV2(loadInfo *querypb.SegmentLoadInfo, schema *schemapb.Coll } } - return indexedFieldInfos, fieldBinlogs, textIndexedInfo, unindexedTextFields + return indexedFieldInfos, fieldBinlogs, textIndexedInfo, unindexedTextFields, jsonIndexes } func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *querypb.SegmentLoadInfo, segment *LocalSegment) (err error) { @@ -704,7 +710,7 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu collection := segment.GetCollection() schemaHelper, _ := typeutil.CreateSchemaHelper(collection.Schema()) - indexedFieldInfos, fieldBinlogs, textIndexes, unindexedTextFields := separateLoadInfoV2(loadInfo, collection.Schema()) + indexedFieldInfos, fieldBinlogs, textIndexes, unindexedTextFields, jsonIndexes := separateLoadInfoV2(loadInfo, collection.Schema()) if err := segment.AddFieldDataInfo(ctx, loadInfo.GetNumOfRows(), loadInfo.GetBinlogPaths()); err != nil { return err } @@ -719,6 +725,9 @@ func (loader *segmentLoader) loadSealedSegment(ctx context.Context, loadInfo *qu if err := loader.loadFieldsIndex(ctx, schemaHelper, segment, loadInfo.GetNumOfRows(), indexedFieldInfos); err != nil { return err } + if err := loader.loadJsonIndexes(ctx, segment, jsonIndexes); err != nil { + return err + } loadFieldsIndexSpan := tr.RecordSpan() metrics.QueryNodeLoadIndexLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Observe(float64(loadFieldsIndexSpan.Milliseconds())) @@ -966,6 +975,34 @@ func (loader *segmentLoader) loadFieldsIndex(ctx context.Context, return nil } +func (loader *segmentLoader) loadJsonIndexes(ctx context.Context, + segment *LocalSegment, + jsonIndexes []*querypb.FieldIndexInfo, +) error { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", segment.Collection()), + zap.Int64("partitionID", segment.Partition()), + zap.Int64("segmentID", segment.ID()), + ) + + for _, fieldInfo := range jsonIndexes { + tr := timerecord.NewTimeRecorder("loadFieldIndex") + err := loader.loadFieldIndex(ctx, segment, fieldInfo) + loadFieldIndexSpan := tr.RecordSpan() + if err != nil { + return err + } + + log.Info("load json field binlogs done for sealed segment with index", + zap.Int64("fieldID", fieldInfo.FieldID), + zap.Int32("current_index_version", fieldInfo.GetCurrentIndexVersion()), + zap.Duration("load_duration", loadFieldIndexSpan), + ) + } + + return nil +} + func (loader *segmentLoader) loadFieldIndex(ctx context.Context, segment *LocalSegment, indexInfo *querypb.FieldIndexInfo) error { filteredPaths := make([]string, 0, len(indexInfo.IndexFilePaths)) @@ -1475,14 +1512,18 @@ func (loader *segmentLoader) LoadIndex(ctx context.Context, return merr.WrapErrIndexNotFound("index file list empty") } - fieldInfo, ok := fieldInfos[info.GetFieldID()] - if !ok { - return merr.WrapErrParameterInvalid("index info with corresponding field info", "missing field info", strconv.FormatInt(fieldInfo.GetFieldID(), 10)) - } - err := loader.loadFieldIndex(ctx, segment, info) - if err != nil { - log.Warn("failed to load index for segment", zap.Error(err)) - return err + if info.IsJson { + loader.loadJsonIndexes(ctx, segment, []*querypb.FieldIndexInfo{info}) + } else { + fieldInfo, ok := fieldInfos[info.GetFieldID()] + if !ok { + return merr.WrapErrParameterInvalid("index info with corresponding field info", "missing field info", strconv.FormatInt(fieldInfo.GetFieldID(), 10)) + } + err := loader.loadFieldIndex(ctx, segment, info) + if err != nil { + log.Warn("failed to load index for segment", zap.Error(err)) + return err + } } } loader.notifyLoadFinish(loadInfo) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 576e549c47224..3257cad005040 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -1187,6 +1187,9 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { return info.IndexInfo.FieldID, info.IndexInfo }), + JsonIndex: lo.SliceToMap(s.JsonIndexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) { + return info.IndexInfo.IndexID, info.IndexInfo + }), }) } diff --git a/pkg/util/indexparamcheck/inverted_checker.go b/pkg/util/indexparamcheck/inverted_checker.go index 8d6893c10085a..71e3b90267b48 100644 --- a/pkg/util/indexparamcheck/inverted_checker.go +++ b/pkg/util/indexparamcheck/inverted_checker.go @@ -19,7 +19,7 @@ func (c *INVERTEDChecker) CheckTrain(params map[string]string) error { func (c *INVERTEDChecker) CheckValidDataType(field *schemapb.FieldSchema) error { dType := field.GetDataType() if !typeutil.IsBoolType(dType) && !typeutil.IsArithmetic(dType) && !typeutil.IsStringType(dType) && - !typeutil.IsArrayType(dType) { + !typeutil.IsArrayType(dType) && !typeutil.IsJSONType(dType) { return fmt.Errorf("INVERTED are not supported on %s field", dType.String()) } return nil