@ -57,6 +57,7 @@ type HeadBlock struct {
wal WAL
activeWriters uint64
highTimestamp int64
closed bool
// descs holds all chunk descs for the head block. Each chunk implicitly
@ -389,9 +390,14 @@ func (h *HeadBlock) Appender() Appender {
return & headAppender { HeadBlock : h , samples : getHeadAppendBuffer ( ) }
}
// Busy returns true if the block has open write transactions.
func ( h * HeadBlock ) Busy ( ) bool {
return atomic . LoadUint64 ( & h . activeWriters ) > 0
// ActiveWriters returns true if the block has open write transactions.
func ( h * HeadBlock ) ActiveWriters ( ) int {
return int ( atomic . LoadUint64 ( & h . activeWriters ) )
}
// HighTimestamp returns the highest inserted sample timestamp.
func ( h * HeadBlock ) HighTimestamp ( ) int64 {
return atomic . LoadInt64 ( & h . highTimestamp )
}
var headPool = sync . Pool { }
@ -415,7 +421,8 @@ type headAppender struct {
newLabels [ ] labels . Labels
newHashes map [ uint64 ] uint64
samples [ ] RefSample
samples [ ] RefSample
highTimestamp int64
}
type hashedLabels struct {
@ -513,6 +520,10 @@ func (a *headAppender) AddFast(ref string, t int64, v float64) error {
}
}
if t > a . highTimestamp {
a . highTimestamp = t
}
a . samples = append ( a . samples , RefSample {
Ref : refn ,
T : t ,
@ -593,6 +604,16 @@ func (a *headAppender) Commit() error {
atomic . AddUint64 ( & a . meta . Stats . NumSamples , total )
atomic . AddUint64 ( & a . meta . Stats . NumSeries , uint64 ( len ( a . newSeries ) ) )
for {
ht := a . HeadBlock . HighTimestamp ( )
if a . highTimestamp <= ht {
break
}
if atomic . CompareAndSwapInt64 ( & a . HeadBlock . highTimestamp , ht , a . highTimestamp ) {
break
}
}
return nil
}