Skip to content

Commit

Permalink
Update registerNode in indexservice
Browse files Browse the repository at this point in the history
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored and yefu.chen committed Jan 20, 2021
1 parent 4588342 commit c35079d
Show file tree
Hide file tree
Showing 31 changed files with 449 additions and 553 deletions.
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Checks: >
-*, clang-diagnostic-*, -clang-diagnostic-error,
clang-analyzer-*, -clang-analyzer-alpha*,
google-*, -google-runtime-references, -google-readability-todo,
modernize-*, -modernize-pass-by-value, -modernize-use-equals-default,
modernize-*, -modernize-pass-by-value, -modernize-use-equals-default, -modernize-use-trailing-return-type,
performance-faster-string-find, performance-for-range-copy,
performance-implicit-conversion-in-loop, performance-inefficient-algorithm,
performance-trivially-destructible, performance-inefficient-vector-operation,
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ build-go: build-cpp
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/binlog $(PWD)/cmd/binlog/main.go 1>/dev/null
@echo "Building singlenode ..."
@mkdir -p $(INSTALL_PATH) && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/singlenode $(PWD)/cmd/singlenode/main.go 1>/dev/null
@echo "Building distributed indexservice ..."
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexservice $(PWD)/cmd/distributed/indexservice/main.go 1>/dev/null
@echo "Building distributed indexnode ..."
@mkdir -p $(INSTALL_PATH)/distributed && go env -w CGO_ENABLED="1" && GO111MODULE=on $(GO) build -o $(INSTALL_PATH)/distributed/indexnode $(PWD)/cmd/distributed/indexnode/main.go 1>/dev/null

build-cpp:
@(env bash $(PWD)/scripts/core_build.sh -f "$(CUSTOM_THIRDPARTY_PATH)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

func main() {
grpcindexnode.Init()
ctx, cancel := context.WithCancel(context.Background())
svr, err := grpcindexnode.CreateIndexNode(ctx)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
)

func main() {
grpcindexserver.Init()
ctx, cancel := context.WithCancel(context.Background())
svr, err := grpcindexserver.CreateIndexServer(ctx)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions internal/core/src/common/FieldMeta.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ datatype_is_vector(DataType datatype) {

struct FieldMeta {
public:
FieldMeta(const FieldMeta&) = delete;
FieldMeta(FieldMeta&&) = default;
FieldMeta&
operator=(const FieldMeta&) = delete;
FieldMeta&
operator=(FieldMeta&&) = default;

FieldMeta(const FieldName& name, FieldId id, DataType type) : name_(name), id_(id), type_(type) {
Assert(!is_vector());
}
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/common/LoadInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ struct LoadIndexInfo {
// NOTE: Refer to common/SystemProperty.cpp for details
struct LoadFieldDataInfo {
int64_t field_id;
void* blob;
int64_t row_count;
void* blob = nullptr;
int64_t row_count = -1;
};
21 changes: 13 additions & 8 deletions internal/core/src/common/Schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,24 @@ namespace milvus {

class Schema {
public:
void
FieldId
AddDebugField(const std::string& name, DataType data_type) {
static int64_t debug_id = 1000;
this->AddField(FieldName(name), FieldId(debug_id), data_type);
debug_id++;
auto field_id = FieldId(debug_id);
debug_id += 2;
this->AddField(FieldName(name), field_id, data_type);
return field_id;
}

// auto gen field_id for convenience
void
FieldId
AddDebugField(const std::string& name, DataType data_type, int64_t dim, MetricType metric_type) {
static int64_t debug_id = 2000;
auto field_meta = FieldMeta(FieldName(name), FieldId(debug_id), data_type, dim, metric_type);
debug_id++;
static int64_t debug_id = 2001;
auto field_id = FieldId(debug_id);
debug_id += 2;
auto field_meta = FieldMeta(FieldName(name), field_id, data_type, dim, metric_type);
this->AddField(std::move(field_meta));
return field_id;
}

// scalar type
Expand Down Expand Up @@ -141,13 +145,14 @@ class Schema {
void
AddField(FieldMeta&& field_meta) {
auto offset = fields_.size();
fields_.emplace_back(field_meta);
AssertInfo(!name_offsets_.count(field_meta.get_name()), "duplicated field name");
name_offsets_.emplace(field_meta.get_name(), offset);
AssertInfo(!id_offsets_.count(field_meta.get_id()), "duplicated field id");
id_offsets_.emplace(field_meta.get_id(), offset);

auto field_sizeof = field_meta.get_sizeof();
sizeof_infos_.push_back(std::move(field_sizeof));
fields_.emplace_back(std::move(field_meta));
total_sizeof_ += field_sizeof;
}

Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ template <class...>
constexpr std::false_type always_false{};

template <typename T>
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 512>>;
using aligned_vector = std::vector<T, boost::alignment::aligned_allocator<T, 64>>;

///////////////////////////////////////////////////////////////////////////////////////////////////
struct QueryResult {
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/query/Plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ Parser::ParseVecNode(const Json& out_body) {
auto field_offset = schema.get_offset(field_name);

auto vec_node = [&]() -> std::unique_ptr<VectorPlanNode> {
auto field_meta = schema.operator[](field_name);
auto& field_meta = schema.operator[](field_name);
auto data_type = field_meta.get_data_type();
if (data_type == DataType::VECTOR_FLOAT) {
return std::make_unique<FloatVectorANNS>();
Expand Down Expand Up @@ -252,7 +252,7 @@ template <typename T>
ExprPtr
Parser::ParseRangeNodeImpl(const FieldName& field_name, const Json& body) {
auto expr = std::make_unique<RangeExprImpl<T>>();
auto field_meta = schema[field_name];
auto& field_meta = schema[field_name];
auto data_type = field_meta.get_data_type();
expr->data_type_ = data_type;
expr->field_offset_ = schema.get_offset(field_name);
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/query/visitors/ExecExprVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ ExecExprVisitor::ExecRangeVisitorImpl(RangeExprImpl<T>& expr, IndexFunc index_fu

// RetType results(vec.num_chunk());
auto indexing_barrier = segment_.num_chunk_index_safe(field_offset);
auto chunk_size = segment_.chunk_size();
auto chunk_size = segment_.size_per_chunk();
auto num_chunk = upper_div(row_count_, chunk_size);
RetType results;

Expand Down Expand Up @@ -290,7 +290,7 @@ ExecExprVisitor::ExecTermVisitorImpl(TermExpr& expr_raw) -> RetType {
auto& field_meta = schema[field_offset];
// auto vec_ptr = records.get_entity<T>(field_offset);
// auto& vec = *vec_ptr;
auto chunk_size = segment_.chunk_size();
auto chunk_size = segment_.size_per_chunk();
auto num_chunk = upper_div(row_count_, chunk_size);
RetType bitsets;

Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/segcore/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@

set(SEGCORE_FILES
SegmentGrowingImpl.cpp
Collection.cpp
collection_c.cpp
segment_c.cpp
SegmentGrowing.cpp
SegmentGrowingImpl.cpp
SegmentSealedImpl.cpp
IndexingEntry.cpp
InsertRecord.cpp
Reduce.cpp
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/segcore/SealedIndexingRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ using SealedIndexingEntryPtr = std::unique_ptr<SealedIndexingEntry>;

struct SealedIndexingRecord {
void
add_entry(FieldOffset field_offset, SealedIndexingEntryPtr&& ptr) {
add_entry(FieldOffset field_offset, MetricType metric_type, knowhere::VecIndexPtr indexing) {
auto ptr = std::make_unique<SealedIndexingEntry>();
ptr->indexing_ = indexing;
ptr->metric_type_ = metric_type;
std::unique_lock lck(mutex_);
entries_[field_offset] = std::move(ptr);
}
Expand Down
10 changes: 3 additions & 7 deletions internal/core/src/segcore/SegmentGrowingImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ SegmentGrowingImpl::FillTargetEntry(const query::Plan* plan, QueryResult& result
auto key_offset_opt = schema_->get_primary_key_offset();
Assert(key_offset_opt.has_value());
auto key_offset = key_offset_opt.value();
auto field_meta = schema_->operator[](key_offset);
auto& field_meta = schema_->operator[](key_offset);
Assert(field_meta.get_data_type() == DataType::INT64);
auto uids = record_.get_entity<int64_t>(key_offset);
for (int64_t i = 0; i < size; ++i) {
Expand All @@ -290,12 +290,8 @@ SegmentGrowingImpl::LoadIndexing(const LoadIndexInfo& info) {

Assert(info.index_params.count("metric_type"));
auto metric_type_str = info.index_params.at("metric_type");
auto entry = std::make_unique<SealedIndexingEntry>();

entry->metric_type_ = GetMetricType(metric_type_str);
entry->indexing_ = info.index;

sealed_indexing_record_.add_entry(field_offset, std::move(entry));
sealed_indexing_record_.add_entry(field_offset, GetMetricType(metric_type_str), info.index);
return Status::OK();
}

Expand All @@ -306,7 +302,7 @@ SegmentGrowingImpl::chunk_data_impl(FieldOffset field_offset, int64_t chunk_id)
}

int64_t
SegmentGrowingImpl::get_safe_num_chunk() const {
SegmentGrowingImpl::num_chunk_data() const {
auto size = get_insert_record().ack_responder_.GetAck();
return upper_div(size, chunk_size_);
}
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
}

int64_t
chunk_size() const final {
size_per_chunk() const final {
return chunk_size_;
}

Expand All @@ -126,7 +126,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
}

int64_t
get_safe_num_chunk() const override;
num_chunk_data() const override;

Status
LoadIndexing(const LoadIndexInfo& info) override;
Expand Down
22 changes: 12 additions & 10 deletions internal/core/src/segcore/SegmentInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,27 +36,21 @@ class SegmentInterface {
virtual int64_t
get_row_count() const = 0;

virtual const Schema&
get_schema() const = 0;

virtual ~SegmentInterface() = default;
};

// internal API for DSL calculation
class SegmentInternalInterface : public SegmentInterface {
public:
virtual const Schema&
get_schema() const = 0;

virtual int64_t
get_safe_num_chunk() const = 0;

template <typename T>
Span<T>
chunk_data(FieldOffset field_offset, int64_t chunk_id) const {
return static_cast<Span<T>>(chunk_data_impl(field_offset, chunk_id));
}

virtual int64_t
num_chunk_index_safe(FieldOffset field_offset) const = 0;

template <typename T>
const knowhere::scalar::StructuredIndex<T>&
chunk_scalar_index(FieldOffset field_offset, int64_t chunk_id) const {
Expand All @@ -68,8 +62,16 @@ class SegmentInternalInterface : public SegmentInterface {
return *ptr;
}

public:
virtual int64_t
num_chunk_index_safe(FieldOffset field_offset) const = 0;

virtual int64_t
num_chunk_data() const = 0;

// return chunk_size for each chunk, renaming against confusion
virtual int64_t
chunk_size() const = 0;
size_per_chunk() const = 0;

protected:
// blob and row_count
Expand Down
10 changes: 2 additions & 8 deletions internal/core/src/segcore/SegmentSealed.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

namespace milvus::segcore {

class SegmentSealed : public SegmentInterface {
class SegmentSealed : public SegmentInternalInterface {
public:
virtual const Schema&
get_schema() = 0;
virtual int64_t
get_row_count() = 0;
virtual void
LoadIndex(const LoadIndexInfo& info) = 0;
virtual void
Expand All @@ -31,8 +27,6 @@ class SegmentSealed : public SegmentInterface {
using SegmentSealedPtr = std::unique_ptr<SegmentSealed>;

SegmentSealedPtr
CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024) {
return nullptr;
}
CreateSealedSegment(SchemaPtr schema, int64_t chunk_size = 32 * 1024);

} // namespace milvus::segcore
Loading

0 comments on commit c35079d

Please sign in to comment.