Skip to content

Commit

Permalink
timeseries: store varbit encoded data into cassandra
Browse files Browse the repository at this point in the history
  • Loading branch information
mattkanwisher committed Sep 21, 2016
1 parent 4520e12 commit 67d76e3
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 281 deletions.
175 changes: 90 additions & 85 deletions storage/local/chunk.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion storage/local/crashrecovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (p *persistence) recoverFromCrash(fingerprintToSeries map[model.Fingerprint
)
}
s.chunkDescs = append(
make([]*chunkDesc, 0, len(s.chunkDescs)-s.persistWatermark),
make([]*ChunkDesc, 0, len(s.chunkDescs)-s.persistWatermark),
s.chunkDescs[s.persistWatermark:]...,
)
numMemChunkDescs.Sub(float64(s.persistWatermark))
Expand Down
24 changes: 12 additions & 12 deletions storage/local/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod
}

// add implements chunk.
func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 {
c = c[:deltaHeaderBytes]
Expand Down Expand Up @@ -174,23 +174,23 @@ func (c deltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}, nil
return []Chunk{&c}, nil
}

// clone implements chunk.
func (c deltaEncodedChunk) clone() chunk {
func (c deltaEncodedChunk) Clone() Chunk {
clone := make(deltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}

// firstTime implements chunk.
func (c deltaEncodedChunk) firstTime() model.Time {
func (c deltaEncodedChunk) FirstTime() model.Time {
return c.baseTime()
}

// newIterator implements chunk.
func (c *deltaEncodedChunk) newIterator() chunkIterator {
// NewIterator implements chunk.
func (c *deltaEncodedChunk) NewIterator() ChunkIterator {
return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
Expand All @@ -202,7 +202,7 @@ func (c *deltaEncodedChunk) newIterator() chunkIterator {
}

// marshal implements chunk.
func (c deltaEncodedChunk) marshal(w io.Writer) error {
func (c deltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
Expand All @@ -218,8 +218,8 @@ func (c deltaEncodedChunk) marshal(w io.Writer) error {
return nil
}

// marshalToBuf implements chunk.
func (c deltaEncodedChunk) marshalToBuf(buf []byte) error {
// MarshalToBuf implements chunk.
func (c deltaEncodedChunk) MarshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
Expand All @@ -233,7 +233,7 @@ func (c deltaEncodedChunk) marshalToBuf(buf []byte) error {
}

// unmarshal implements chunk.
func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
func (c *deltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
Expand All @@ -250,7 +250,7 @@ func (c *deltaEncodedChunk) unmarshal(r io.Reader) error {
}

// unmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
Expand All @@ -265,7 +265,7 @@ func (c *deltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
}

// encoding implements chunk.
func (c deltaEncodedChunk) encoding() chunkEncoding { return delta }
func (c deltaEncodedChunk) Encoding() ChunkEncoding { return Delta }

func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
Expand Down
14 changes: 7 additions & 7 deletions storage/local/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {

cases := []struct {
chunkTypeName string
chunkConstructor func(deltaBytes, deltaBytes, bool, int) chunk
chunkConstructor func(deltaBytes, deltaBytes, bool, int) Chunk
minHeaderLen int
chunkLenPos int
}{
{
chunkTypeName: "deltaEncodedChunk",
chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk {
chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk {
return newDeltaEncodedChunk(a, b, c, d)
},
minHeaderLen: deltaHeaderBytes,
chunkLenPos: deltaHeaderBufLenOffset,
},
{
chunkTypeName: "doubleDeltaEncodedChunk",
chunkConstructor: func(a, b deltaBytes, c bool, d int) chunk {
chunkConstructor: func(a, b deltaBytes, c bool, d int) Chunk {
return newDoubleDeltaEncodedChunk(a, b, c, d)
},
minHeaderLen: doubleDeltaHeaderMinBytes,
Expand All @@ -77,7 +77,7 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {
for _, c := range cases {
chunk := c.chunkConstructor(d1, d4, false, chunkLen)

cs, err := chunk.add(model.SamplePair{
cs, err := chunk.Add(model.SamplePair{
Timestamp: model.Now(),
Value: model.SampleValue(100),
})
Expand All @@ -87,16 +87,16 @@ func TestUnmarshalingCorruptedDeltaReturnsAnError(t *testing.T) {

buf := make([]byte, chunkLen)

cs[0].marshalToBuf(buf)
cs[0].MarshalToBuf(buf)

// Corrupt the length to be every possible too-small value
for i := 0; i < c.minHeaderLen; i++ {
binary.LittleEndian.PutUint16(buf[c.chunkLenPos:], uint16(i))

err = cs[0].unmarshalFromBuf(buf)
err = cs[0].UnmarshalFromBuf(buf)
verifyUnmarshallingError(err, c.chunkTypeName, "buf", i)

err = cs[0].unmarshal(bytes.NewBuffer(buf))
err = cs[0].Unmarshal(bytes.NewBuffer(buf))
verifyUnmarshallingError(err, c.chunkTypeName, "Reader", i)
}
}
Expand Down
32 changes: 16 additions & 16 deletions storage/local/doubledelta.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
}

// add implements chunk.
func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.len() == 0 {
return c.addFirstSample(s), nil
Expand Down Expand Up @@ -181,23 +181,23 @@ func (c doubleDeltaEncodedChunk) add(s model.SamplePair) ([]chunk, error) {
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []chunk{&c}, nil
return []Chunk{&c}, nil
}

// clone implements chunk.
func (c doubleDeltaEncodedChunk) clone() chunk {
func (c doubleDeltaEncodedChunk) Clone() Chunk {
clone := make(doubleDeltaEncodedChunk, len(c), cap(c))
copy(clone, c)
return &clone
}

// firstTime implements chunk.
func (c doubleDeltaEncodedChunk) firstTime() model.Time {
func (c doubleDeltaEncodedChunk) FirstTime() model.Time {
return c.baseTime()
}

// newIterator implements chunk.
func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
// NewIterator( implements chunk.
func (c *doubleDeltaEncodedChunk) NewIterator() ChunkIterator {
return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
Expand All @@ -211,7 +211,7 @@ func (c *doubleDeltaEncodedChunk) newIterator() chunkIterator {
}

// marshal implements chunk.
func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
func (c doubleDeltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
Expand All @@ -227,8 +227,8 @@ func (c doubleDeltaEncodedChunk) marshal(w io.Writer) error {
return nil
}

// marshalToBuf implements chunk.
func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error {
// MarshalToBuf implements chunk.
func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint")
}
Expand All @@ -242,7 +242,7 @@ func (c doubleDeltaEncodedChunk) marshalToBuf(buf []byte) error {
}

// unmarshal implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
func (c *doubleDeltaEncodedChunk) Unmarshal(r io.Reader) error {
*c = (*c)[:cap(*c)]
if _, err := io.ReadFull(r, *c); err != nil {
return err
Expand All @@ -260,7 +260,7 @@ func (c *doubleDeltaEncodedChunk) unmarshal(r io.Reader) error {
}

// unmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
Expand All @@ -275,7 +275,7 @@ func (c *doubleDeltaEncodedChunk) unmarshalFromBuf(buf []byte) error {
}

// encoding implements chunk.
func (c doubleDeltaEncodedChunk) encoding() chunkEncoding { return doubleDelta }
func (c doubleDeltaEncodedChunk) Encoding() ChunkEncoding { return DoubleDelta }

func (c doubleDeltaEncodedChunk) baseTime() model.Time {
return model.Time(
Expand Down Expand Up @@ -347,7 +347,7 @@ func (c doubleDeltaEncodedChunk) isInt() bool {

// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
Expand All @@ -357,12 +357,12 @@ func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []chunk {
c[doubleDeltaHeaderBaseValueOffset:],
math.Float64bits(float64(s.Value)),
)
return []chunk{&c}
return []Chunk{&c}
}

// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]chunk, error) {
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
Expand Down Expand Up @@ -403,7 +403,7 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
math.Float64bits(float64(baseValueDelta)),
)
}
return []chunk{&c}, nil
return []Chunk{&c}, nil
}

// doubleDeltaEncodedIndexAccessor implements indexAccessor.
Expand Down
12 changes: 6 additions & 6 deletions storage/local/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (hs *headsScanner) scan() bool {
firstTime int64
lastTime int64
encoding byte
ch chunk
ch Chunk
lastTimeHead model.Time
)
if seriesFlags, hs.err = hs.r.ReadByte(); hs.err != nil {
Expand Down Expand Up @@ -146,7 +146,7 @@ func (hs *headsScanner) scan() bool {
if numChunkDescs, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs := make([]*chunkDesc, numChunkDescs)
chunkDescs := make([]*ChunkDesc, numChunkDescs)
if hs.version == headsFormatLegacyVersion {
if headChunkPersisted {
persistWatermark = numChunkDescs
Expand All @@ -163,7 +163,7 @@ func (hs *headsScanner) scan() bool {
if lastTime, hs.err = binary.ReadVarint(hs.r); hs.err != nil {
return false
}
chunkDescs[i] = &chunkDesc{
chunkDescs[i] = &ChunkDesc{
chunkFirstTime: model.Time(firstTime),
chunkLastTime: model.Time(lastTime),
}
Expand All @@ -176,13 +176,13 @@ func (hs *headsScanner) scan() bool {
if encoding, hs.err = hs.r.ReadByte(); hs.err != nil {
return false
}
if ch, hs.err = newChunkForEncoding(chunkEncoding(encoding)); hs.err != nil {
if ch, hs.err = NewChunkForEncoding(ChunkEncoding(encoding)); hs.err != nil {
return false
}
if hs.err = ch.unmarshal(hs.r); hs.err != nil {
if hs.err = ch.Unmarshal(hs.r); hs.err != nil {
return false
}
cd := newChunkDesc(ch, ch.firstTime())
cd := NewChunkDesc(ch, ch.FirstTime())
if i < numChunkDescs-1 {
// This is NOT the head chunk. So it's a chunk
// to be persisted, and we need to populate lastTime.
Expand Down
Loading

0 comments on commit 67d76e3

Please sign in to comment.