Skip to content

Commit

Permalink
fix: Set an empty segment if compaction deleted all inserts (#36045)
Browse files Browse the repository at this point in the history
See also: #36038 
pr: #36044

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn authored Sep 6, 2024
1 parent b34b035 commit 6dc7d20
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
21 changes: 13 additions & 8 deletions internal/datanode/compaction/mix_compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
s.task.currentTs = currTs
s.task.plan.CollectionTtl = int64(collTTL)
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil)
s.mockAlloc.EXPECT().AllocOne().Return(19531, nil)

kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter)
s.Require().NoError(err)
Expand All @@ -237,7 +238,12 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {

compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil)
s.NoError(err)
s.Equal(0, len(compactionSegments))
s.Equal(1, len(compactionSegments))
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
s.EqualValues(19531, compactionSegments[0].GetSegmentID())
s.Empty(compactionSegments[0].GetDeltalogs())
s.Empty(compactionSegments[0].GetInsertLogs())
s.Empty(compactionSegments[0].GetField2StatslogPaths())
}

func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
Expand All @@ -247,18 +253,19 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
description string
deletions map[interface{}]uint64
expectedRes int
leftNumRows int
}{
{"no deletion", nil, 1},
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1},
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0},
{"no deletion", nil, 1, 1},
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1},
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0},
}

s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(888888, 999999, nil)
kvs, _, err := serializeWrite(context.TODO(), s.task.Allocator, s.segWriter)
s.Require().NoError(err)
for _, test := range tests {
s.Run(test.description, func() {
if test.expectedRes > 0 {
if test.leftNumRows > 0 {
s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(77777, 99999, nil).Once()
}
s.mockAlloc.EXPECT().AllocOne().Return(888888, nil).Maybe()
Expand All @@ -275,9 +282,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions)
s.NoError(err)
s.EqualValues(test.expectedRes, len(res))
if test.expectedRes > 0 {
s.EqualValues(1, res[0].GetNumOfRows())
}
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
})
}
}
Expand Down
21 changes: 20 additions & 1 deletion internal/datanode/compaction/segment_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type MultiSegmentWriter struct {
// segID -> fieldID -> binlogs

res []*datapb.CompactionSegment
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
}

func NewMultiSegmentWriter(binlogIO io.BinlogIO, allocator allocator.Allocator, plan *datapb.CompactionPlan, maxRows int64, partitionID, collectionID int64) *MultiSegmentWriter {
Expand Down Expand Up @@ -175,9 +176,27 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {
return writer.Write(v)
}

// Could return an empty list if every insert of the segment is deleted
func (w *MultiSegmentWriter) appendEmptySegment() error {
writer, err := w.getWriter()
if err != nil {
return err
}

w.res = append(w.res, &datapb.CompactionSegment{
SegmentID: writer.GetSegmentID(),
NumOfRows: 0,
Channel: w.channel,
})
return nil
}

// DONOT return an empty list if every insert of the segment is deleted,
// append an empty segment instead
func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
if w.current == -1 {
if err := w.appendEmptySegment(); err != nil {
return nil, err
}
return w.res, nil
}

Expand Down

0 comments on commit 6dc7d20

Please sign in to comment.