Build layered indexers.

The indexers will be extracted in a short while and wrapped accordingly with
these types.

Change-Id: I4d1abda4e46117210babad5aa0d42f9ca1f6594f
changes/10/10/3
Matt T. Proud 11 years ago
parent 972e856d9b
commit acf91f38bd

@ -400,6 +400,203 @@ type MetricIndexer interface {
IndexMetrics(FingerprintMetricMapping) error
}
// IndexObserver listens and receives changes to a given
// FingerprintMetricMapping.
type IndexerObserver interface {
Observe(FingerprintMetricMapping) error
}
// IndexerProxy receives IndexMetric requests and proxies them to the underlying
// MetricIndexer. Upon success of the underlying receiver, the registered
// IndexObservers are called serially.
//
// If an error occurs in the underlying MetricIndexer or any of the observers,
// this proxy will not work any further and return the offending error in this
// call or any subsequent ones.
type IndexerProxy struct {
err error
i MetricIndexer
observers []IndexerObserver
}
func (p *IndexerProxy) IndexMetrics(b FingerprintMetricMapping) error {
if p.err != nil {
return p.err
}
if p.err = p.i.IndexMetrics(b); p.err != nil {
return p.err
}
for _, o := range p.observers {
if p.err = o.Observe(b); p.err != nil {
return p.err
}
}
return nil
}
// Close closes the underlying indexer.
func (p *IndexerProxy) Close() error {
if p.err != nil {
return p.err
}
if closer, ok := p.i.(io.Closer); ok {
p.err = closer.Close()
return p.err
}
return nil
}
// Close flushes the underlying index requests before closing.
func (p *IndexerProxy) Flush() error {
if p.err != nil {
return p.err
}
if flusher, ok := p.i.(flusher); ok {
p.err = flusher.Flush()
return p.err
}
return nil
}
// NewIndexerProxy builds an IndexerProxy for the given configuration.
func NewIndexerProxy(i MetricIndexer, o ...IndexerObserver) *IndexerProxy {
return &IndexerProxy{
i: i,
observers: o,
}
}
// SynchronizedIndexer provides naive locking for any MetricIndexer.
type SynchronizedIndexer struct {
mu sync.Mutex
i MetricIndexer
}
func (i *SynchronizedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
i.mu.Lock()
defer i.mu.Unlock()
return i.i.IndexMetrics(b)
}
type flusher interface {
Flush() error
}
func (i *SynchronizedIndexer) Flush() error {
if flusher, ok := i.i.(flusher); ok {
i.mu.Lock()
defer i.mu.Unlock()
return flusher.Flush()
}
return nil
}
func (i *SynchronizedIndexer) Close() error {
if closer, ok := i.i.(io.Closer); ok {
i.mu.Lock()
defer i.mu.Unlock()
return closer.Close()
}
return nil
}
// NewSynchronizedIndexer builds a new MetricIndexer.
func NewSynchronizedIndexer(i MetricIndexer) *SynchronizedIndexer {
return &SynchronizedIndexer{
i: i,
}
}
// BufferedIndexer provides unsynchronized index buffering.
//
// If an error occurs in the underlying MetricIndexer or any of the observers,
// this proxy will not work any further and return the offending error.
type BufferedIndexer struct {
i MetricIndexer
limit int
buf []FingerprintMetricMapping
err error
}
func (i *BufferedIndexer) IndexMetrics(b FingerprintMetricMapping) error {
if i.err != nil {
return i.err
}
if len(i.buf) < i.limit {
i.buf = append(i.buf, b)
return nil
}
i.buf = append(i.buf)
i.err = i.Flush()
return i.err
}
// Flush writes all pending entries to the index.
func (i *BufferedIndexer) Flush() error {
if i.err != nil {
return i.err
}
if len(i.buf) == 0 {
return nil
}
superset := FingerprintMetricMapping{}
for _, b := range i.buf {
for fp, m := range b {
if _, ok := superset[fp]; ok {
continue
}
superset[fp] = m
}
}
i.buf = make([]FingerprintMetricMapping, 0, i.limit)
i.err = i.i.IndexMetrics(superset)
return i.err
}
// Close flushes and closes the underlying buffer.
func (i *BufferedIndexer) Close() error {
if err := i.Flush(); err != nil {
return err
}
if closer, ok := i.i.(io.Closer); ok {
return closer.Close()
}
return nil
}
func NewBufferedIndexer(i MetricIndexer, limit int) *BufferedIndexer {
return &BufferedIndexer{
i: i,
limit: limit,
buf: make([]FingerprintMetricMapping, 0, limit),
}
}
// TotalIndexer is a MetricIndexer that indexes all standard facets of a metric
// that a user or the Prometheus subsystem would want to query against:
//

Loading…
Cancel
Save