// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package submatview

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/hashicorp/go-hclog"

	"github.com/hashicorp/consul/agent/cache"
	"github.com/hashicorp/consul/lib/ttlcache"
)

// Store of Materializers. Store implements an interface similar to
// agent/cache.Cache, and allows a single Materializer to fulfil multiple requests
// as long as the requests are identical.
// Store is used in place of agent/cache.Cache because with the streaming
// backend there is no longer any need to run a background goroutine to refresh
// stored values.
type Store struct {
	logger hclog.Logger
	lock   sync.RWMutex
	byKey  map[string]entry

	// expiryHeap tracks entries with 0 remaining requests. Entries are ordered
	// by most recent expiry first.
	expiryHeap *ttlcache.ExpiryHeap

	// idleTTL is the duration of time an entry should remain in the Store after the
	// last request for that entry has been terminated. It is a field on the struct
	// so that it can be patched in tests without needing a global lock.
	idleTTL time.Duration
}

// A Materializer maintains a materialized view of a subscription on an event stream.
type Materializer interface {
	Run(ctx context.Context)
	Query(ctx context.Context, minIndex uint64) (Result, error)
}

type entry struct {
	materializer Materializer
	expiry       *ttlcache.Entry
	stop         func()
	// requests is the count of active requests using this entry. This entry will
	// remain in the store as long as this count remains > 0.
	requests int
	// evicting is used to mark an entry that will be evicted when the current in-
	// flight requests finish.
	evicting bool
}

// NewStore creates and returns a Store that is ready for use. The caller must
// call Store.Run (likely in a separate goroutine) to start the expiration loop.
func NewStore(logger hclog.Logger) *Store {
	return &Store{
		logger:     logger,
		byKey:      make(map[string]entry),
		expiryHeap: ttlcache.NewExpiryHeap(),
		idleTTL:    20 * time.Minute,
	}
}

// Run the expiration loop until the context is cancelled.
func (s *Store) Run(ctx context.Context) {
	for {
		s.lock.RLock()
		timer := s.expiryHeap.Next()
		s.lock.RUnlock()

		select {
		case <-ctx.Done():
			timer.Stop()
			return

		// the first item in the heap has changed, restart the timer with the
		// new TTL.
		case <-s.expiryHeap.NotifyCh:
			timer.Stop()
			continue

		// the TTL for the first item has been reached, attempt an expiration.
		case <-timer.Wait():
			s.lock.Lock()

			he := timer.Entry
			s.expiryHeap.Remove(he.Index())

			e := s.byKey[he.Key()]

			// Only stop the materializer if there are no active requests.
			if e.requests == 0 {
				s.logger.Trace("evicting item from store", "key", he.Key())
				e.stop()
				delete(s.byKey, he.Key())
			}

			s.lock.Unlock()
		}
	}
}

// Request is used to request data from the Store.
// Note that cache.Request is required, but some of the fields cache.RequestInfo
// fields are ignored (ex: MaxAge, and MustRevalidate).
type Request interface {
	cache.Request
	// NewMaterializer will be called if there is no active materializer to fulfill
	// the request. It should return a Materializer appropriate for streaming
	// data to fulfil this request.
	NewMaterializer() (Materializer, error)
	// Type should return a string which uniquely identifies this type of request.
	// The returned value is used as the prefix of the key used to index
	// entries in the Store.
	Type() string
}

// Get a value from the store, blocking if the store has not yet seen the
// req.Index value.
// See agent/cache.Cache.Get for complete documentation.
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
	info := req.CacheInfo()
	key, materializer, err := s.readEntry(req)
	if err != nil {
		return Result{}, err
	}
	defer s.releaseEntry(key)

	if info.Timeout > 0 {
		var cancel context.CancelFunc
		ctx, cancel = context.WithTimeout(ctx, info.Timeout)
		defer cancel()
	}

	result, err := materializer.Query(ctx, info.MinIndex)
	// context.DeadlineExceeded is translated to nil to match the timeout
	// behaviour of agent/cache.Cache.Get.
	if err == nil || errors.Is(err, context.DeadlineExceeded) {
		return result, nil
	}
	return result, err
}

// Notify the updateCh when there are updates to the entry identified by req.
// See agent/cache.Cache.Notify for complete documentation.
//
// Request.CacheInfo().Timeout is ignored because it is not really relevant in
// this case. Instead set a deadline on the context.
func (s *Store) Notify(
	ctx context.Context,
	req Request,
	correlationID string,
	updateCh chan<- cache.UpdateEvent,
) error {
	return s.NotifyCallback(ctx, req, correlationID, func(ctx context.Context, event cache.UpdateEvent) {
		select {
		case updateCh <- event:
		case <-ctx.Done():
			return
		}
	})
}

// NotifyCallback subscribes to updates of the entry identified by req in the
// same way as Notify, but accepts a callback function instead of a channel.
func (s *Store) NotifyCallback(
	ctx context.Context,
	req Request,
	correlationID string,
	cb cache.Callback,
) error {
	info := req.CacheInfo()
	key, materializer, err := s.readEntry(req)
	if err != nil {
		return err
	}

	go func() {
		defer s.releaseEntry(key)

		index := info.MinIndex
		for {
			result, err := materializer.Query(ctx, index)
			switch {
			case ctx.Err() != nil:
				return
			case err != nil:
				s.logger.Warn("handling error in Store.Notify",
					"error", err,
					"request-type", req.Type(),
					"index", index)
			}

			index = result.Index
			cb(ctx, cache.UpdateEvent{
				CorrelationID: correlationID,
				Result:        result.Value,
				Err:           err,
				Meta:          cache.ResultMeta{Index: result.Index, Hit: result.Cached},
			})
		}
	}()
	return nil
}

// readEntry from the store, and increment the requests counter. releaseEntry
// must be called when the request is finished to decrement the counter.
func (s *Store) readEntry(req Request) (string, Materializer, error) {
	info := req.CacheInfo()
	key := makeEntryKey(req.Type(), info)

	s.lock.Lock()
	defer s.lock.Unlock()
	e, ok := s.byKey[key]
	if ok {
		if e.evicting {
			return "", nil, errors.New("item is marked for eviction")
		}
		e.requests++
		s.byKey[key] = e
		return key, e.materializer, nil
	}

	mat, err := req.NewMaterializer()
	if err != nil {
		return "", nil, err
	}

	ctx, cancel := context.WithCancel(context.Background())
	go func() {
		mat.Run(ctx)

		// Materializers run until they either reach their TTL and are evicted (which
		// cancels the given context) or encounter an irrecoverable error.
		//
		// If the context hasn't been canceled, we know it's the error case so we
		// trigger an immediate eviction.
		if ctx.Err() == nil {
			s.evictNow(key)
		}
	}()

	e = entry{
		materializer: mat,
		stop:         cancel,
		requests:     1,
	}
	s.byKey[key] = e
	return key, e.materializer, nil
}

// evictNow causes the item with the given key to be evicted immediately.
//
// If there are requests in-flight, the item is marked for eviction such that
// once the requests have been served releaseEntry will move it to the top of
// the expiry heap. If there are no requests in-flight, evictNow will move the
// item to the top of the expiry heap itself.
//
// In either case, the entry's evicting flag prevents it from being served by
// readEntry (and thereby gaining new in-flight requests).
func (s *Store) evictNow(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()

	e := s.byKey[key]
	e.evicting = true
	s.byKey[key] = e

	if e.requests == 0 {
		s.expireNowLocked(key)
	}
}

// releaseEntry decrements the request count and starts an expiry timer if the
// count has reached 0. Must be called once for every call to readEntry.
func (s *Store) releaseEntry(key string) {
	s.lock.Lock()
	defer s.lock.Unlock()
	e := s.byKey[key]
	e.requests--
	s.byKey[key] = e

	if e.requests > 0 {
		return
	}

	if e.evicting {
		s.expireNowLocked(key)
		return
	}

	if e.expiry.Index() == ttlcache.NotIndexed {
		e.expiry = s.expiryHeap.Add(key, s.idleTTL)
		s.byKey[key] = e
		return
	}

	s.expiryHeap.Update(e.expiry.Index(), s.idleTTL)
}

// expireNowLocked moves the item with the given key to the top of the expiry
// heap, causing it to be picked up by the expiry loop and evicted immediately.
func (s *Store) expireNowLocked(key string) {
	e := s.byKey[key]
	if idx := e.expiry.Index(); idx != ttlcache.NotIndexed {
		s.expiryHeap.Remove(idx)
	}
	e.expiry = s.expiryHeap.Add(key, time.Duration(0))
	s.byKey[key] = e
}

// makeEntryKey matches agent/cache.makeEntryKey, but may change in the future.
func makeEntryKey(typ string, r cache.RequestInfo) string {
	return fmt.Sprintf("%s/%s/%s/%s", typ, r.Datacenter, r.Token, r.Key)
}