Skip to content

Commit

Permalink
Merge pull request prometheus#1559 from prometheus/beorn7/storage
Browse files Browse the repository at this point in the history
Never drop a still open head chunk.
  • Loading branch information
beorn7 committed Apr 15, 2016
2 parents 096a2ef + db16acd commit 23d383a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 14 deletions.
34 changes: 20 additions & 14 deletions storage/local/series.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,21 +313,27 @@ func (s *memorySeries) dropChunks(t model.Time) error {
break
}
}
if keepIdx > 0 {
s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]...,
)
s.persistWatermark -= keepIdx
if s.persistWatermark < 0 {
panic("dropped unpersisted chunks from memory")
}
if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx
}
numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
if keepIdx == len(s.chunkDescs) && !s.headChunkClosed {
// Never drop an open head chunk.
keepIdx--
}
if keepIdx <= 0 {
// Nothing to drop.
return nil
}
s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-keepIdx),
s.chunkDescs[keepIdx:]...,
)
s.persistWatermark -= keepIdx
if s.persistWatermark < 0 {
panic("dropped unpersisted chunks from memory")
}
if s.chunkDescsOffset != -1 {
s.chunkDescsOffset += keepIdx
}
numMemChunkDescs.Sub(float64(keepIdx))
s.dirty = true
return nil
}

Expand Down
63 changes: 63 additions & 0 deletions storage/local/series_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2016 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package local

import (
"testing"
"time"

"github.com/prometheus/common/model"
)

func TestDropChunks(t *testing.T) {
s, err := newMemorySeries(nil, nil, time.Time{})
if err != nil {
t.Fatal(err)
}

s.add(model.SamplePair{
Timestamp: 100,
Value: 42,
})
s.add(model.SamplePair{
Timestamp: 110,
Value: 4711,
})

err = s.dropChunks(110)
if err != nil {
t.Fatal(err)
}
if len(s.chunkDescs) == 0 {
t.Fatal("chunk dropped too early")
}

err = s.dropChunks(115)
if err != nil {
t.Fatal(err)
}
if len(s.chunkDescs) == 0 {
t.Fatal("open head chunk dropped")
}

s.headChunkClosed = true
s.persistWatermark = 1
err = s.dropChunks(115)
if err != nil {
t.Fatal(err)
}
if len(s.chunkDescs) != 0 {
t.Error("did not drop closed head chunk")
}
}

0 comments on commit 23d383a

Please sign in to comment.