package agent

import (
	"fmt"
	"sync"

	"golang.org/x/net/context"

	"github.com/hashicorp/consul/agent/cache"
	cachetype "github.com/hashicorp/consul/agent/cache-types"
	"github.com/hashicorp/consul/agent/configentry"
	"github.com/hashicorp/consul/agent/structs"
)

// ServiceManager watches changes to central service config for all services
// registered with it. When a central config changes, the local service will
// be updated with the correct values from the central config.
type ServiceManager struct {
	agent *Agent

	// servicesLock guards the services map, but not the watches contained
	// therein
	servicesLock sync.Mutex

	// services tracks all active watches for registered services
	services map[structs.ServiceID]*serviceConfigWatch

	// ctx is the shared context for all goroutines launched
	ctx context.Context

	// cancel can be used to stop all goroutines launched
	cancel context.CancelFunc

	// running keeps track of live goroutines (worker and watcher)
	running sync.WaitGroup
}

func NewServiceManager(agent *Agent) *ServiceManager {
	ctx, cancel := context.WithCancel(context.Background())
	return &ServiceManager{
		agent:    agent,
		services: make(map[structs.ServiceID]*serviceConfigWatch),
		ctx:      ctx,
		cancel:   cancel,
	}
}

// Stop forces all background goroutines to terminate and blocks until they complete.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (s *ServiceManager) Stop() {
	s.cancel()
	s.running.Wait()
}

// AddService will (re)create a serviceConfigWatch on the given service. For
// each call of this function the first registration will happen inline and
// will read the merged global defaults for the service through the agent cache
// (regardless of whether or not the service was already registered).  This
// lets validation or authorization related errors bubble back up to the
// caller's RPC inline with their request. Upon success a goroutine will keep
// this updated in the background.
//
// If waitForCentralConfig=true is used, the initial registration blocks on
// fetching the merged global config through the cache. If false, no such RPC
// occurs and only the previousDefaults are used.
//
// persistServiceConfig controls if the INITIAL registration will result in
// persisting the service config to disk again. All background updates will
// always persist.
//
// service, chkTypes, persist, token, replaceExistingChecks, and source are
// basically pass-through arguments to Agent.addServiceInternal that follow the
// semantics there. The one key difference is that the service provided will be
// merged with the global defaults before registration.
//
// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) AddService(req addServiceLockedRequest) error {
	s.servicesLock.Lock()
	defer s.servicesLock.Unlock()

	sid := req.Service.CompoundServiceID()

	// If a service watch already exists, shut it down and replace it.
	oldWatch, updating := s.services[sid]
	if updating {
		oldWatch.Stop()
		delete(s.services, sid)
	}

	// Get the existing global config and do the initial registration with the
	// merged config.
	watch := &serviceConfigWatch{registration: req, agent: s.agent}
	if err := watch.register(s.ctx); err != nil {
		return err
	}
	if err := watch.start(s.ctx, &s.running); err != nil {
		return err
	}

	s.services[sid] = watch

	if updating {
		s.agent.logger.Debug("updated local registration for service", "service", req.Service.ID)
	} else {
		s.agent.logger.Debug("added local registration for service", "service", req.Service.ID)
	}

	return nil
}

// NOTE: the caller must hold the Agent.stateLock!
func (s *ServiceManager) RemoveService(serviceID structs.ServiceID) {
	s.servicesLock.Lock()
	defer s.servicesLock.Unlock()

	if oldWatch, exists := s.services[serviceID]; exists {
		oldWatch.Stop()
		delete(s.services, serviceID)
	}
}

// serviceConfigWatch is a long running helper for composing the end config
// for a given service from both the local registration and the global
// service/proxy defaults.
type serviceConfigWatch struct {
	registration addServiceLockedRequest
	agent        *Agent

	// cacheKey stores the key of the current request, when registration changes
	// we check to see if a new cache watch is needed.
	cacheKey string

	cancelFunc func()
	running    sync.WaitGroup
}

// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) register(ctx context.Context) error {
	serviceDefaults, err := w.registration.serviceDefaults(ctx)
	if err != nil {
		return fmt.Errorf("could not retrieve initial service_defaults config for service %q: %v",
			w.registration.Service.ID, err)
	}

	// Merge the local registration with the central defaults and update this service
	// in the local state.
	merged, err := configentry.MergeServiceConfig(serviceDefaults, w.registration.Service)
	if err != nil {
		return err
	}

	// make a copy of the AddServiceRequest
	req := w.registration
	req.Service = merged

	err = w.agent.addServiceInternal(addServiceInternalRequest{
		addServiceLockedRequest: req,
		persistService:          w.registration.Service,
		persistServiceDefaults:  serviceDefaults,
	})
	if err != nil {
		return fmt.Errorf("error updating service registration: %v", err)
	}
	return nil
}

func serviceDefaultsFromStruct(v *structs.ServiceConfigResponse) func(context.Context) (*structs.ServiceConfigResponse, error) {
	return func(_ context.Context) (*structs.ServiceConfigResponse, error) {
		return v, nil
	}
}

func serviceDefaultsFromCache(bd BaseDeps, req AddServiceRequest) func(context.Context) (*structs.ServiceConfigResponse, error) {
	// NOTE: this is called while holding the Agent.stateLock
	return func(ctx context.Context) (*structs.ServiceConfigResponse, error) {
		req := makeConfigRequest(bd, req)

		raw, _, err := bd.Cache.Get(ctx, cachetype.ResolvedServiceConfigName, req)
		if err != nil {
			return nil, err
		}

		serviceConfig, ok := raw.(*structs.ServiceConfigResponse)
		if !ok {
			// This should never happen, but we want to protect against panics
			return nil, fmt.Errorf("internal error: response type not correct")
		}
		return serviceConfig, nil
	}
}

// Start starts the config watch and a goroutine to handle updates over the
// updateCh. This is safe to call more than once assuming you have called Stop
// after each Start.
//
// NOTE: this is called while holding the Agent.stateLock
func (w *serviceConfigWatch) start(ctx context.Context, wg *sync.WaitGroup) error {
	ctx, w.cancelFunc = context.WithCancel(ctx)

	// Configure and start a cache.Notify goroutine to run a continuous
	// blocking query on the resolved service config for this service.
	req := makeConfigRequest(w.agent.baseDeps, w.registration.AddServiceRequest)
	w.cacheKey = req.CacheInfo().Key

	updateCh := make(chan cache.UpdateEvent, 1)

	// We use the cache key as the correlationID here. Notify in general will not
	// respond on the updateCh after the context is cancelled however there could
	// possible be a race where it has only just got an update and checked the
	// context before we cancel and so might still deliver the old event. Using
	// the cacheKey allows us to ignore updates from the old cache watch and makes
	// even this rare edge case safe.
	err := w.agent.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, req, w.cacheKey, updateCh)
	if err != nil {
		w.cancelFunc()
		return err
	}

	w.running.Add(1)
	wg.Add(1)
	go w.runWatch(ctx, wg, updateCh)

	return nil
}

func (w *serviceConfigWatch) Stop() {
	w.cancelFunc()
	w.running.Wait()
}

// runWatch handles any update events from the cache.Notify until the
// config watch is shut down.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) runWatch(ctx context.Context, wg *sync.WaitGroup, updateCh chan cache.UpdateEvent) {
	defer wg.Done()
	defer w.running.Done()

	for {
		select {
		case <-ctx.Done():
			return
		case event := <-updateCh:
			if err := w.handleUpdate(ctx, event); err != nil {
				w.agent.logger.Error("error handling service update", "error", err)
			}
		}
	}
}

// handleUpdate receives an update event from the global config defaults, updates
// the local state and re-registers the service with the newly merged config.
//
// NOTE: the caller must NOT hold the Agent.stateLock!
func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.UpdateEvent) error {
	// If we got an error, log a warning if this is the first update; otherwise return the error.
	// We want the initial update to cause a service registration no matter what.
	if event.Err != nil {
		return fmt.Errorf("error watching service config: %v", event.Err)
	}

	serviceDefaults, ok := event.Result.(*structs.ServiceConfigResponse)
	if !ok {
		return fmt.Errorf("unknown update event type: %T", event)
	}

	// Sanity check this even came from the currently active watch to ignore
	// rare races when switching cache keys
	if event.CorrelationID != w.cacheKey {
		// It's a no-op. The new watcher will deliver (or may have already
		// delivered) the correct config so just ignore this old message.
		return nil
	}

	// Merge the local registration with the central defaults and update this service
	// in the local state.
	merged, err := configentry.MergeServiceConfig(serviceDefaults, w.registration.Service)
	if err != nil {
		return err
	}

	// make a copy of the AddServiceRequest
	req := w.registration
	req.Service = merged
	req.persistServiceConfig = true

	args := addServiceInternalRequest{
		addServiceLockedRequest: req,
		persistService:          w.registration.Service,
		persistServiceDefaults:  serviceDefaults,
	}

	if err := w.agent.stateLock.TryLock(ctx); err != nil {
		return nil
	}
	defer w.agent.stateLock.Unlock()

	// The context may have been cancelled after the lock was acquired.
	if err := ctx.Err(); err != nil {
		return nil
	}

	if err := w.agent.addServiceInternal(args); err != nil {
		return fmt.Errorf("error updating service registration: %v", err)
	}
	return nil
}

func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest {
	var (
		ns   = addReq.Service
		name = ns.Service
	)

	var upstreams []structs.ServiceID

	// Note that only sidecar proxies should even make it here for now although
	// later that will change to add the condition.
	if ns.IsSidecarProxy() {
		// This is a sidecar proxy, ignore the proxy service's config since we are
		// managed by the target service config.
		name = ns.Proxy.DestinationServiceName

		// Also if we have any upstreams defined, add them to the request so we can
		// learn about their configs.
		for _, us := range ns.Proxy.Upstreams {
			if us.DestinationType == "" || us.DestinationType == structs.UpstreamDestTypeService {
				sid := us.DestinationID()
				sid.EnterpriseMeta.Merge(&ns.EnterpriseMeta)
				upstreams = append(upstreams, sid)
			}
		}
	}

	req := &structs.ServiceConfigRequest{
		Name:           name,
		Datacenter:     bd.RuntimeConfig.Datacenter,
		QueryOptions:   structs.QueryOptions{Token: addReq.token},
		MeshGateway:    ns.Proxy.MeshGateway,
		Mode:           ns.Proxy.Mode,
		UpstreamIDs:    upstreams,
		EnterpriseMeta: ns.EnterpriseMeta,
	}
	if req.QueryOptions.Token == "" {
		req.QueryOptions.Token = bd.Tokens.AgentToken()
	}
	return req
}