Add GVK lookup to deploy controller

Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
pull/6359/head
Brad Davidson 2022-10-19 00:31:02 +00:00 committed by Brad Davidson
parent 8d28a38a18
commit 68a56ff8d8
1 changed files with 78 additions and 1 deletions

View File

@ -11,6 +11,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"time"
"github.com/k3s-io/k3s/pkg/agent/util"
@ -24,10 +25,13 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
)
@ -46,6 +50,8 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap
bases: bases,
disables: disables,
modTime: map[string]time.Time{},
gvkCache: map[schema.GroupVersionKind]bool{},
discovery: client.Discovery(),
}
addons.Enqueue(metav1.NamespaceNone, startKey)
@ -60,13 +66,17 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap
}
type watcher struct {
sync.Mutex
apply apply.Apply
addonCache controllersv1.AddonCache
addons controllersv1.AddonClient
bases []string
disables map[string]bool
modTime map[string]time.Time
gvkCache map[schema.GroupVersionKind]bool
recorder record.EventRecorder
discovery discovery.DiscoveryInterface
}
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
@ -195,18 +205,29 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
return err
}
// Merge GVK list early for validation
addon.Status.GVKs = append(addon.Status.GVKs, objects.GVKs()...)
// Ensure that we don't try to prune using GVKs that the server doesn't have.
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
if err := w.validateGVKs(&addon); err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err)
return err
}
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
// existing objects, rejected by validating webhooks, etc.
// WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously
// applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply
// doesn't know to search that GVK for owner references, it won't find and delete them.
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).Apply(objects); err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
return err
}
// Emit event, Update Addon checksum only if apply was successful
// Emit event, Update Addon checksum and GVKs only if apply was successful
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
addon.Spec.Checksum = checksum
addon.Status.GVKs = objects.GVKs()
@ -236,6 +257,12 @@ func (w *watcher) delete(path string) error {
}
}
// Ensure that we don't try to delete using GVKs that the server doesn't have.
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
if err := w.validateGVKs(&addon); err != nil {
return err
}
// ensure that the addon is completely removed before deleting the objectSet,
// so return when err == nil, otherwise pods may get stuck terminating
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
@ -263,6 +290,56 @@ func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
return *addon, nil
}
// validateGVKs removes from the Addon status any GVKs that the server does not support
func (w *watcher) validateGVKs(addon *apisv1.Addon) error {
gvks := []schema.GroupVersionKind{}
for _, gvk := range addon.Status.GVKs {
found, err := w.serverHasGVK(gvk)
if err != nil {
return err
}
if found {
gvks = append(gvks, gvk)
} else {
logrus.Warnf("Pruned unknown GVK from %s %s/%s: %s", addon.TypeMeta.GroupVersionKind(), addon.Namespace, addon.Name, gvk)
}
}
addon.Status.GVKs = gvks
return nil
}
// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some
// point in time. Note this may fail to filter out GVKs that are removed from the cluster after
// startup (for example, if CRDs are deleted) - but the Wrangler DesiredSet cache has the same issue,
// so it should be fine.
func (w *watcher) serverHasGVK(gvk schema.GroupVersionKind) (bool, error) {
w.Lock()
defer w.Unlock()
if found, ok := w.gvkCache[gvk]; ok {
return found, nil
}
resources, err := w.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
// Cache all Kinds for this GroupVersion to save on future lookups
for _, resource := range resources.APIResources {
// Resources in the requested GV are returned with empty GroupVersion.
// Subresources with different GV may also be returned, but we aren't interested in those.
if resource.Group == "" && resource.Version == "" {
w.gvkCache[gvk.GroupVersion().WithKind(resource.Kind)] = true
}
}
return w.gvkCache[gvk], nil
}
// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
func objectSet(content []byte) (*objectset.ObjectSet, error) {
objs, err := yamlToObjects(bytes.NewBuffer(content))