// Package ae provides tools to synchronize state between local and remote consul servers.
package ae

import (
	"fmt"
	"github.com/hashicorp/consul/lib"
	"github.com/hashicorp/consul/logging"
	"github.com/hashicorp/go-hclog"
	"math"
	"sync"
	"time"
)

// scaleThreshold is the number of nodes after which regular sync runs are
// spread out farther apart. The value should be a power of 2 since the
// scale function uses log2.
//
// When set to 128 nodes the delay between regular runs is doubled when the
// cluster is larger than 128 nodes. It doubles again when it passes 256
// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
// factor is 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
const scaleThreshold = 128

// scaleFactor returns a factor by which the next sync run should be delayed to
// avoid saturation of the cluster. The larger the cluster grows the farther
// the sync runs should be spread apart.
//
// The current implementation uses a log2 scale which doubles the delay between
// runs every time the cluster doubles in size.
func scaleFactor(nodes int) int {
	if nodes <= scaleThreshold {
		return 1.0
	}
	return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
}

type SyncState interface {
	SyncChanges() error
	SyncFull() error
}

// StateSyncer manages background synchronization of the given state.
//
// The state is synchronized on a regular basis or on demand when either
// the state has changed or a new Consul server has joined the cluster.
//
// The regular state synchronization provides a self-healing mechanism
// for the cluster which is also called anti-entropy.
type StateSyncer struct {
	// State contains the data that needs to be synchronized.
	State SyncState

	// Interval is the time between two full sync runs.
	Interval time.Duration

	// ShutdownCh is closed when the application is shutting down.
	ShutdownCh chan struct{}

	// Logger is the logger.
	Logger hclog.Logger

	// ClusterSize returns the number of members in the cluster to
	// allow staggering the sync runs based on cluster size.
	// This needs to be set before Run() is called.
	ClusterSize func() int

	// SyncFull allows triggering an immediate but staggered full sync
	// in a non-blocking way.
	SyncFull *Trigger

	// SyncChanges allows triggering an immediate partial sync
	// in a non-blocking way.
	SyncChanges *Trigger

	// paused stores whether sync runs are temporarily disabled.
	pauseLock sync.Mutex
	paused    int

	// serverUpInterval is the max time after which a full sync is
	// performed when a server has been added to the cluster.
	serverUpInterval time.Duration

	// retryFailInterval is the time after which a failed full sync is retried.
	retryFailInterval time.Duration

	// stagger randomly picks a duration between 0s and the given duration.
	stagger func(time.Duration) time.Duration

	// retrySyncFullEvent generates an event based on multiple conditions
	// when the state machine is trying to retry a full state sync.
	retrySyncFullEvent func() event

	// syncChangesEvent generates an event based on multiple conditions
	// when the state machine is performing partial state syncs.
	syncChangesEvent func() event

	// nextFullSyncCh is a chan that receives a time.Time when the next
	// full sync should occur.
	nextFullSyncCh <-chan time.Time
}

const (
	// serverUpIntv is the max time to wait before a sync is triggered
	// when a consul server has been added to the cluster.
	serverUpIntv = 3 * time.Second

	// retryFailIntv is the min time to wait before a failed sync is retried.
	retryFailIntv = 15 * time.Second
)

func NewStateSyncer(state SyncState, intv time.Duration, shutdownCh chan struct{}, logger hclog.Logger) *StateSyncer {
	if logger == nil {
		logger = hclog.New(&hclog.LoggerOptions{})
	}

	s := &StateSyncer{
		State:             state,
		Interval:          intv,
		ShutdownCh:        shutdownCh,
		Logger:            logger.Named(logging.AntiEntropy),
		SyncFull:          NewTrigger(),
		SyncChanges:       NewTrigger(),
		serverUpInterval:  serverUpIntv,
		retryFailInterval: retryFailIntv,
	}

	// retain these methods as member variables so that
	// we can mock them for testing.
	s.retrySyncFullEvent = s.retrySyncFullEventFn
	s.syncChangesEvent = s.syncChangesEventFn
	s.stagger = s.staggerFn

	return s
}

// fsmState defines states for the state machine.
type fsmState string

const (
	doneState          fsmState = "done"
	fullSyncState      fsmState = "fullSync"
	partialSyncState   fsmState = "partialSync"
	retryFullSyncState fsmState = "retryFullSync"
)

// Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
	if s.ClusterSize == nil {
		panic("ClusterSize not set")
	}
	s.resetNextFullSyncCh()
	s.runFSM(fullSyncState, s.nextFSMState)
}

// runFSM runs the state machine.
func (s *StateSyncer) runFSM(fs fsmState, next func(fsmState) fsmState) {
	for {
		if fs = next(fs); fs == doneState {
			return
		}
	}
}

// nextFSMState determines the next state based on the current state.
func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
	switch fs {
	case fullSyncState:
		if s.Paused() {
			return retryFullSyncState
		}

		err := s.State.SyncFull()
		if err != nil {
			s.Logger.Error("failed to sync remote state", "error", err)
			return retryFullSyncState
		}

		return partialSyncState

	case retryFullSyncState:
		e := s.retrySyncFullEvent()
		switch e {
		case syncFullNotifEvent, syncFullTimerEvent:
			return fullSyncState
		case shutdownEvent:
			return doneState
		default:
			panic(fmt.Sprintf("invalid event: %s", e))
		}

	case partialSyncState:
		e := s.syncChangesEvent()
		switch e {
		case syncFullNotifEvent, syncFullTimerEvent:
			return fullSyncState

		case syncChangesNotifEvent:
			if s.Paused() {
				return partialSyncState
			}

			err := s.State.SyncChanges()
			if err != nil {
				s.Logger.Error("failed to sync changes", "error", err)
			}
			return partialSyncState

		case shutdownEvent:
			return doneState

		default:
			panic(fmt.Sprintf("invalid event: %s", e))
		}

	default:
		panic(fmt.Sprintf("invalid state: %s", fs))
	}
}

// event defines a timing or notification event from multiple timers and
// channels.
type event string

const (
	shutdownEvent         event = "shutdown"
	syncFullNotifEvent    event = "syncFullNotif"
	syncFullTimerEvent    event = "syncFullTimer"
	syncChangesNotifEvent event = "syncChangesNotif"
)

// retrySyncFullEventFn waits for an event which triggers a retry
// of a full sync or a termination signal. This function should not be
// called directly but through s.retryFullSyncState to allow mocking for
// testing.
func (s *StateSyncer) retrySyncFullEventFn() event {
	select {
	// trigger a full sync immediately.
	// this is usually called when a consul server was added to the cluster.
	// stagger the delay to avoid a thundering herd.
	case <-s.SyncFull.Notif():
		select {
		case <-time.After(s.stagger(s.serverUpInterval)):
			return syncFullNotifEvent
		case <-s.ShutdownCh:
			return shutdownEvent
		}

	// retry full sync after some time
	// it is using retryFailInterval because it is retrying the sync
	case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
		s.resetNextFullSyncCh()
		return syncFullTimerEvent

	case <-s.ShutdownCh:
		return shutdownEvent
	}
}

// syncChangesEventFn waits for a event which either triggers a full
// or a partial sync or a termination signal. This function should not
// be called directly but through s.syncChangesEvent to allow mocking
// for testing.
func (s *StateSyncer) syncChangesEventFn() event {
	select {
	// trigger a full sync immediately
	// this is usually called when a consul server was added to the cluster.
	// stagger the delay to avoid a thundering herd.
	case <-s.SyncFull.Notif():
		select {
		case <-time.After(s.stagger(s.serverUpInterval)):
			s.resetNextFullSyncCh()
			return syncFullNotifEvent
		case <-s.ShutdownCh:
			return shutdownEvent
		}

	// time for a full sync again
	case <-s.nextFullSyncCh:
		s.resetNextFullSyncCh()
		return syncFullTimerEvent

	// do partial syncs on demand
	case <-s.SyncChanges.Notif():
		return syncChangesNotifEvent

	case <-s.ShutdownCh:
		return shutdownEvent
	}
}

// resetNextFullSyncCh resets nextFullSyncCh and sets it to interval+stagger.
// Call this function everytime a full sync is performed.
func (s *StateSyncer) resetNextFullSyncCh() {
	if s.stagger != nil {
		s.nextFullSyncCh = time.After(s.Interval + s.stagger(s.Interval))
	} else {
		s.nextFullSyncCh = time.After(s.Interval)
	}
}

// stubbed out for testing
var libRandomStagger = lib.RandomStagger

// staggerFn returns a random duration which depends on the cluster size
// and a random factor which should provide some timely distribution of
// cluster wide events. This function should not be called directly
// but through s.stagger to allow mocking for testing.
func (s *StateSyncer) staggerFn(d time.Duration) time.Duration {
	f := scaleFactor(s.ClusterSize())
	return libRandomStagger(time.Duration(f) * d)
}

// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
	s.pauseLock.Lock()
	s.paused++
	s.pauseLock.Unlock()
}

// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
	s.pauseLock.Lock()
	defer s.pauseLock.Unlock()
	return s.paused != 0
}

// Resume re-enables sync runs. It returns true if it was the last pause/resume
// pair on the stack and so actually caused the state syncer to resume.
func (s *StateSyncer) Resume() bool {
	s.pauseLock.Lock()
	s.paused--
	if s.paused < 0 {
		panic("unbalanced pause/resume")
	}
	trigger := s.paused == 0
	s.pauseLock.Unlock()
	if trigger {
		s.SyncChanges.Trigger()
	}
	return trigger
}