Extract indexing to separate routine.

pull/84/head
Matt T. Proud 2013-03-14 15:42:28 -07:00
parent 582354f6de
commit 67300af137
2 changed files with 92 additions and 60 deletions

View File

@ -43,6 +43,7 @@ const (
hasLabelName = "has_label_name"
hasLabelPair = "has_label_pair"
indexMetric = "index_metric"
indexSamples = "index_samples"
rebuildDiskFrontier = "rebuild_disk_frontier"
renderView = "render_view"
setLabelNameFingerprints = "set_label_name_fingerprints"

View File

@ -199,15 +199,10 @@ func (l *LevelDBMetricPersistence) AppendSample(sample model.Sample) (err error)
return
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
}()
// Group the samples by fingerprint.
// groupByFingerprint collects all of the provided samples, groups them
// together by their respective metric fingerprint, and finally sorts
// them chronologically.
func groupByFingerprint(samples model.Samples) map[model.Fingerprint]model.Samples {
var (
fingerprintToSamples = map[model.Fingerprint]model.Samples{}
)
@ -219,17 +214,18 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
fingerprintToSamples[fingerprint] = samples
}
// Begin the sorting of grouped samples.
var (
sortingSemaphore = make(chan bool, sortConcurrency)
doneSorting = sync.WaitGroup{}
doneSorting sync.WaitGroup
)
for i := 0; i < sortConcurrency; i++ {
sortingSemaphore <- true
}
for _, samples := range fingerprintToSamples {
doneSorting.Add(1)
go func(samples model.Samples) {
<-sortingSemaphore
sort.Sort(samples)
@ -240,56 +236,18 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
doneSorting.Wait()
var (
doneCommitting = sync.WaitGroup{}
)
return fingerprintToSamples
}
go func() {
doneCommitting.Add(1)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
defer doneCommitting.Done()
// indexSamples takes groups of samples, determines which ones contain metrics
// that are unknown to the storage stack, and then proceeds to update all
// affected indices.
func (l *LevelDBMetricPersistence) indexSamples(groups map[model.Fingerprint]model.Samples) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
if lengthOfGroup == 0 {
break
}
take := *leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
}
}
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
recordOutcome(duration, err, map[string]string{operation: indexSamples, result: success}, map[string]string{operation: indexSamples, result: failure})
}()
var (
@ -298,7 +256,7 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
// Determine which metrics are unknown in the database.
for fingerprint, samples := range fingerprintToSamples {
for fingerprint, samples := range groups {
sample := samples[0]
metricDTO := model.SampleToMetricDTO(&sample)
indexHas, err := l.hasIndexMetric(metricDTO)
@ -501,7 +459,80 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
}
}
return
}
func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err error) {
begin := time.Now()
defer func() {
duration := time.Since(begin)
recordOutcome(duration, err, map[string]string{operation: appendSamples, result: success}, map[string]string{operation: appendSamples, result: failure})
}()
var (
fingerprintToSamples = groupByFingerprint(samples)
indexErrChan = make(chan error)
doneCommitting sync.WaitGroup
)
go func(groups map[model.Fingerprint]model.Samples) {
indexErrChan <- l.indexSamples(groups)
}(fingerprintToSamples)
go func() {
doneCommitting.Add(1)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
defer doneCommitting.Done()
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
if lengthOfGroup == 0 {
break
}
take := *leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
}
}
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
}()
doneCommitting.Wait()
err = <-indexErrChan
if err != nil {
panic(err)
}
return
}