// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package submatview

import (
	"context"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/hashicorp/consul/proto/private/pbsubscribe"
)

// RPCMaterializer is a materializer for a streaming cache type
// and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when its TTL expires.
type RPCMaterializer struct {
	deps    Deps
	client  StreamClient
	handler eventHandler

	mat *materializer
}

var _ Materializer = (*RPCMaterializer)(nil)

// StreamClient provides a subscription to state change events.
type StreamClient interface {
	Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}

// NewRPCMaterializer returns a new Materializer. Run must be called to start it.
func NewRPCMaterializer(client StreamClient, deps Deps) *RPCMaterializer {
	m := RPCMaterializer{
		deps:   deps,
		client: client,
		mat:    newMaterializer(deps.Logger, deps.View, deps.Waiter),
	}
	return &m
}

// Query implements Materializer
func (m *RPCMaterializer) Query(ctx context.Context, minIndex uint64) (Result, error) {
	return m.mat.query(ctx, minIndex)
}

// Run receives events from the StreamClient and sends them to the View. It runs
// until ctx is cancelled, so it is expected to be run in a goroutine.
// Mirrors implementation of LocalMaterializer
//
// Run implements Materializer
func (m *RPCMaterializer) Run(ctx context.Context) {
	for {
		req := m.deps.Request(m.mat.currentIndex())
		err := m.subscribeOnce(ctx, req)
		if ctx.Err() != nil {
			return
		}
		m.mat.handleError(req, err)

		if err := m.mat.retryWaiter.Wait(ctx); err != nil {
			return
		}
	}
}

// subscribeOnce opens a new subscribe streaming call to the servers and runs
// for its lifetime or until the view is closed.
func (m *RPCMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	m.handler = initialHandler(req.Index)

	s, err := m.client.Subscribe(ctx, req)
	if err != nil {
		return err
	}

	for {
		event, err := s.Recv()
		switch {
		case isGrpcStatus(err, codes.Aborted):
			m.mat.reset()
			return resetErr("stream reset requested")
		case err != nil:
			return err
		}

		m.handler, err = m.handler(m, event)
		if err != nil {
			m.mat.reset()
			return err
		}
	}
}

func isGrpcStatus(err error, code codes.Code) bool {
	s, ok := status.FromError(err)
	return ok && s.Code() == code
}

// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
// notifying clients.
type resetErr string

// Temporary Implements the internal Temporary interface
func (e resetErr) Temporary() bool {
	return true
}

// Error implements error
func (e resetErr) Error() string {
	return string(e)
}

// updateView implements viewState
func (m *RPCMaterializer) updateView(events []*pbsubscribe.Event, index uint64) error {
	return m.mat.updateView(events, index)
}

// reset implements viewState
func (m *RPCMaterializer) reset() {
	m.mat.reset()
}