diff --git a/storage/local/chunk/chunk.go b/storage/local/chunk/chunk.go index 11c42b39d..733028d16 100644 --- a/storage/local/chunk/chunk.go +++ b/storage/local/chunk/chunk.go @@ -260,7 +260,7 @@ func (d *Desc) MaybeEvict() bool { // Chunk is the interface for all chunks. Chunks are generally not // goroutine-safe. type Chunk interface { - // add adds a SamplePair to the chunks, performs any necessary + // Add adds a SamplePair to the chunks, performs any necessary // re-encoding, and adds any necessary overflow chunks. It returns the // new version of the original chunk, followed by overflow chunks, if // any. The first chunk returned might be the same as the original one @@ -276,6 +276,10 @@ type Chunk interface { UnmarshalFromBuf([]byte) error Encoding() Encoding Utilization() float64 + + // Len returns the number of samples in the chunk. Implementations may be + // expensive. + Len() int } // Iterator enables efficient access to the content of a chunk. It is diff --git a/storage/local/chunk/chunk_test.go b/storage/local/chunk/chunk_test.go new file mode 100644 index 000000000..f5f21f67d --- /dev/null +++ b/storage/local/chunk/chunk_test.go @@ -0,0 +1,46 @@ +// Copyright 2016 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Note: this file has tests for code in both delta.go and doubledelta.go -- +// it may make sense to split those out later, but given that the tests are +// near-identical and share a helper, this feels simpler for now. + +package chunk + +import ( + "testing" + + "github.com/prometheus/common/model" +) + +func TestLen(t *testing.T) { + chunks := []Chunk{} + for _, encoding := range []Encoding{Delta, DoubleDelta, Varbit} { + c, err := NewForEncoding(encoding) + if err != nil { + t.Fatal(err) + } + chunks = append(chunks, c) + } + + for _, c := range chunks { + for i := 0; i <= 10; i++ { + if c.Len() != i { + t.Errorf("chunk type %s should have %d samples, had %d", c.Encoding(), i, c.Len()) + } + + cs, _ := c.Add(model.SamplePair{model.Time(i), model.SampleValue(i)}) + c = cs[0] + } + } +} diff --git a/storage/local/chunk/delta.go b/storage/local/chunk/delta.go index 4e04bca47..2be00cce9 100644 --- a/storage/local/chunk/delta.go +++ b/storage/local/chunk/delta.go @@ -75,7 +75,7 @@ func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncod // Add implements chunk. func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. - if c.len() == 0 { + if c.Len() == 0 { c = c[:deltaHeaderBytes] binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp)) binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value))) @@ -191,7 +191,7 @@ func (c deltaEncodedChunk) FirstTime() model.Time { // NewIterator implements chunk. func (c *deltaEncodedChunk) NewIterator() Iterator { - return newIndexAccessingChunkIterator(c.len(), &deltaEncodedIndexAccessor{ + return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), baseV: c.baseValue(), @@ -296,7 +296,8 @@ func (c deltaEncodedChunk) sampleSize() int { return int(c.timeBytes() + c.valueBytes()) } -func (c deltaEncodedChunk) len() int { +// Len implements Chunk. Runs in constant time. +func (c deltaEncodedChunk) Len() int { if len(c) < deltaHeaderBytes { return 0 } diff --git a/storage/local/chunk/doubledelta.go b/storage/local/chunk/doubledelta.go index 9a4744130..2a1221461 100644 --- a/storage/local/chunk/doubledelta.go +++ b/storage/local/chunk/doubledelta.go @@ -83,14 +83,14 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub // Add implements chunk. func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { // TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation. - if c.len() == 0 { + if c.Len() == 0 { return c.addFirstSample(s), nil } tb := c.timeBytes() vb := c.valueBytes() - if c.len() == 1 { + if c.Len() == 1 { return c.addSecondSample(s, tb, vb) } @@ -103,10 +103,10 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) { return addToOverflowChunk(&c, s) } - projectedTime := c.baseTime() + model.Time(c.len())*c.baseTimeDelta() + projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta() ddt := s.Timestamp - projectedTime - projectedValue := c.baseValue() + model.SampleValue(c.len())*c.baseValueDelta() + projectedValue := c.baseValue() + model.SampleValue(c.Len())*c.baseValueDelta() ddv := s.Value - projectedValue ntb, nvb, nInt := tb, vb, c.isInt() @@ -198,7 +198,7 @@ func (c doubleDeltaEncodedChunk) FirstTime() model.Time { // NewIterator( implements chunk. func (c *doubleDeltaEncodedChunk) NewIterator() Iterator { - return newIndexAccessingChunkIterator(c.len(), &doubleDeltaEncodedIndexAccessor{ + return newIndexAccessingChunkIterator(c.Len(), &doubleDeltaEncodedIndexAccessor{ c: *c, baseT: c.baseTime(), baseΔT: c.baseTimeDelta(), @@ -279,7 +279,7 @@ func (c doubleDeltaEncodedChunk) Encoding() Encoding { return DoubleDelta } // Utilization implements chunk. func (c doubleDeltaEncodedChunk) Utilization() float64 { - return float64(len(c)) / float64(cap(c)) + return float64(len(c)-doubleDeltaHeaderIsIntOffset-1) / float64(cap(c)) } func (c doubleDeltaEncodedChunk) baseTime() model.Time { @@ -336,7 +336,8 @@ func (c doubleDeltaEncodedChunk) sampleSize() int { return int(c.timeBytes() + c.valueBytes()) } -func (c doubleDeltaEncodedChunk) len() int { +// Len implements Chunk. Runs in constant time. +func (c doubleDeltaEncodedChunk) Len() int { if len(c) <= doubleDeltaHeaderIsIntOffset+1 { return 0 } diff --git a/storage/local/chunk/varbit.go b/storage/local/chunk/varbit.go index 42de4adb2..2ec59efd3 100644 --- a/storage/local/chunk/varbit.go +++ b/storage/local/chunk/varbit.go @@ -328,6 +328,15 @@ func (c varbitChunk) Utilization() float64 { return math.Min(float64(c.nextSampleOffset()/8+15)/float64(cap(c)), 1) } +// Len implements chunk. Runs in O(n). +func (c varbitChunk) Len() int { + it := c.NewIterator() + i := 0 + for ; it.Scan(); i++ { + } + return i +} + // FirstTime implements chunk. func (c varbitChunk) FirstTime() model.Time { return model.Time(