Merge branch 'beorn7/storage2' into beorn7/storage3

pull/1448/head^2
beorn7 9 years ago
commit 55eddab25f

@ -30,9 +30,9 @@ func (p *memorySeriesPreloader) PreloadRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through, false)
cds, iter, err := p.storage.preloadChunksForRange(fp, from, through)
if err != nil {
return iter, err
return nil, err
}
p.pinnedChunkDescs = append(p.pinnedChunkDescs, cds...)
return iter, nil
@ -43,7 +43,7 @@ func (p *memorySeriesPreloader) PreloadInstant(
fp model.Fingerprint,
timestamp model.Time, stalenessDelta time.Duration,
) (SeriesIterator, error) {
cds, iter, err := p.storage.preloadChunksForRange(fp, timestamp.Add(-stalenessDelta), timestamp, true)
cds, iter, err := p.storage.preloadChunksForInstant(fp, timestamp.Add(-stalenessDelta), timestamp)
if err != nil {
return nil, err
}

@ -383,30 +383,40 @@ func (s *memorySeries) newIterator(pinnedChunkDescs []*chunkDesc) SeriesIterator
}
}
// preloadChunksForInstant preloads chunks for the latest value in the given
// range. If the last sample saved in the memorySeries itself is the latest
// value in the given range, it will in fact preload zero chunks and just take
// that value.
func (s *memorySeries) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have a lastSamplePair in the series, and thas last samplePair
// is in the interval, just take it in a singleSampleSeriesIterator. No
// need to pin or load anything.
lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair {
iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample},
start: model.Now().Add(-mss.dropAfter),
}
return nil, iter, nil
}
// If we are here, we are out of luck and have to delegate to the more
// expensive method.
return s.preloadChunksForRange(fp, from, through, mss)
}
// preloadChunksForRange loads chunks for the given range from the persistence.
// The caller must have locked the fingerprint of the series.
func (s *memorySeries) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
lastSampleOnly bool,
mss *memorySeriesStorage,
) ([]*chunkDesc, SeriesIterator, error) {
// If we have to preload for only one sample, and we have a
// lastSamplePair in the series, and thas last samplePair is in the
// interval, just take it in a singleSampleSeriesIterator. No need to
// pin or load anything.
if lastSampleOnly {
lastSample := s.lastSamplePair()
if !through.Before(lastSample.Timestamp) &&
!from.After(lastSample.Timestamp) &&
lastSample != ZeroSamplePair {
iter := &boundedIterator{
it: &singleSampleSeriesIterator{samplePair: lastSample},
start: model.Now().Add(-mss.dropAfter),
}
return nil, iter, nil
}
}
firstChunkDescTime := model.Latest
if len(s.chunkDescs) > 0 {
firstChunkDescTime = s.chunkDescs[0].firstTime()

@ -677,35 +677,58 @@ func (s *memorySeriesStorage) getOrCreateSeries(fp model.Fingerprint, m model.Me
return series
}
func (s *memorySeriesStorage) getSeriesForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
) (*memorySeries, error) {
series, ok := s.fpToSeries.get(fp)
if ok {
return series, nil
}
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nil
}
if last.Before(from) || first.After(through) {
return nil, nil
}
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
return nil, err
}
return s.getOrCreateSeries(fp, metric), nil
}
func (s *memorySeriesStorage) preloadChunksForRange(
fp model.Fingerprint,
from model.Time, through model.Time,
lastSampleOnly bool,
) ([]*chunkDesc, SeriesIterator, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, ok := s.fpToSeries.get(fp)
if !ok {
has, first, last, err := s.persistence.hasArchivedMetric(fp)
if err != nil {
return nil, nopIter, err
}
if !has {
s.invalidPreloadRequestsCount.Inc()
return nil, nopIter, nil
}
if from.Before(last) && through.After(first) {
metric, err := s.persistence.archivedMetric(fp)
if err != nil {
return nil, nopIter, err
}
series = s.getOrCreateSeries(fp, metric)
} else {
return nil, nopIter, nil
}
series, err := s.getSeriesForRange(fp, from, through)
if err != nil || series == nil {
return nil, nopIter, err
}
return series.preloadChunksForRange(fp, from, through, s)
}
func (s *memorySeriesStorage) preloadChunksForInstant(
fp model.Fingerprint,
from model.Time, through model.Time,
) ([]*chunkDesc, SeriesIterator, error) {
s.fpLocker.Lock(fp)
defer s.fpLocker.Unlock(fp)
series, err := s.getSeriesForRange(fp, from, through)
if err != nil || series == nil {
return nil, nopIter, err
}
return series.preloadChunksForRange(fp, from, through, lastSampleOnly, s)
return series.preloadChunksForInstant(fp, from, through, s)
}
func (s *memorySeriesStorage) handleEvictList() {

@ -492,7 +492,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps2))
}
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -500,7 +500,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -525,7 +525,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of fingerprints: %d", len(fps3))
}
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest, false)
_, it, err = s.preloadChunksForRange(fpList[0], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -533,7 +533,7 @@ func TestDropMetrics(t *testing.T) {
t.Errorf("unexpected number of samples: %d", len(vals))
}
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest, false)
_, it, err = s.preloadChunksForRange(fpList[1], model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -662,7 +662,7 @@ func testValueAtOrBeforeTime(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -739,7 +739,7 @@ func benchmarkValueAtOrBeforeTime(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
@ -820,7 +820,7 @@ func testRangeValues(t *testing.T, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -975,7 +975,7 @@ func benchmarkRangeValues(b *testing.B, encoding chunkEncoding) {
fp := model.Metric{}.FastFingerprint()
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
b.Fatalf("Error preloading everything: %s", err)
}
@ -1024,7 +1024,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop ~half of the chunks.
s.maintainMemorySeries(fp, 10000)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err := s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}
@ -1045,7 +1045,7 @@ func testEvictAndPurgeSeries(t *testing.T, encoding chunkEncoding) {
// Drop everything.
s.maintainMemorySeries(fp, 100000)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest, false)
_, it, err = s.preloadChunksForRange(fp, model.Earliest, model.Latest)
if err != nil {
t.Fatalf("Error preloading everything: %s", err)
}

Loading…
Cancel
Save