Browse Source

Extract appending from goroutine.

pull/84/head
Matt T. Proud 12 years ago
parent
commit
47ce7ad302
  1. 72
      storage/metric/leveldb.go

72
storage/metric/leveldb.go

@ -540,7 +540,6 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
var (
fingerprintToSamples = groupByFingerprint(samples)
indexErrChan = make(chan error)
doneCommitting sync.WaitGroup
)
go func(groups map[model.Fingerprint]model.Samples) {
@ -555,55 +554,50 @@ func (l *LevelDBMetricPersistence) AppendSamples(samples model.Samples) (err err
indexErrChan <- l.indexMetrics(metrics)
}(fingerprintToSamples)
go func() {
doneCommitting.Add(1)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
defer doneCommitting.Done()
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
samplesBatch := leveldb.NewBatch()
defer samplesBatch.Close()
if lengthOfGroup == 0 {
break
}
for fingerprint, group := range fingerprintToSamples {
for {
lengthOfGroup := len(group)
take := *leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
if lengthOfGroup == 0 {
break
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
take := *leveldbChunkSize
if lengthOfGroup < take {
take = lengthOfGroup
}
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
chunk := group[0:take]
group = group[take:lengthOfGroup]
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
key := &dto.SampleKey{
Fingerprint: fingerprint.ToDTO(),
Timestamp: indexable.EncodeTime(chunk[0].Timestamp),
LastTimestamp: proto.Int64(chunk[take-1].Timestamp.Unix()),
SampleCount: proto.Uint32(uint32(take)),
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
value := &dto.SampleValueSeries{}
for _, sample := range chunk {
value.Value = append(value.Value, &dto.SampleValueSeries_Value{
Timestamp: proto.Int64(sample.Timestamp.Unix()),
Value: proto.Float32(float32(sample.Value)),
})
}
samplesBatch.Put(coding.NewProtocolBufferEncoder(key), coding.NewProtocolBufferEncoder(value))
}
}
err = l.metricSamples.Commit(samplesBatch)
err = l.metricSamples.Commit(samplesBatch)
if err != nil {
panic(err)
}
}()
if err != nil {
panic(err)
}
doneCommitting.Wait()
err = <-indexErrChan
if err != nil {
panic(err)

Loading…
Cancel
Save