diff --git a/pkg/deploy/controller.go b/pkg/deploy/controller.go index 6d26746599..6245e46566 100644 --- a/pkg/deploy/controller.go +++ b/pkg/deploy/controller.go @@ -11,6 +11,7 @@ import ( "path/filepath" "sort" "strings" + "sync" "time" "github.com/k3s-io/k3s/pkg/agent/util" @@ -24,10 +25,13 @@ import ( "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" yamlDecoder "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/discovery" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" ) @@ -46,6 +50,8 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap bases: bases, disables: disables, modTime: map[string]time.Time{}, + gvkCache: map[schema.GroupVersionKind]bool{}, + discovery: client.Discovery(), } addons.Enqueue(metav1.NamespaceNone, startKey) @@ -60,13 +66,17 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap } type watcher struct { + sync.Mutex + apply apply.Apply addonCache controllersv1.AddonCache addons controllersv1.AddonClient bases []string disables map[string]bool modTime map[string]time.Time + gvkCache map[schema.GroupVersionKind]bool recorder record.EventRecorder + discovery discovery.DiscoveryInterface } // start calls listFiles at regular intervals to trigger application of manifests that have changed on disk. @@ -195,18 +205,29 @@ func (w *watcher) deploy(path string, compareChecksum bool) error { return err } + // Merge GVK list early for validation + addon.Status.GVKs = append(addon.Status.GVKs, objects.GVKs()...) + + // Ensure that we don't try to prune using GVKs that the server doesn't have. + // This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example. + if err := w.validateGVKs(&addon); err != nil { + w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err) + return err + } + // Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to // existing objects, rejected by validating webhooks, etc. // WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously // applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply // doesn't know to search that GVK for owner references, it won't find and delete them. w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path) + if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).Apply(objects); err != nil { w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err) return err } - // Emit event, Update Addon checksum only if apply was successful + // Emit event, Update Addon checksum and GVKs only if apply was successful w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path) addon.Spec.Checksum = checksum addon.Status.GVKs = objects.GVKs() @@ -236,6 +257,12 @@ func (w *watcher) delete(path string) error { } } + // Ensure that we don't try to delete using GVKs that the server doesn't have. + // This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example. + if err := w.validateGVKs(&addon); err != nil { + return err + } + // ensure that the addon is completely removed before deleting the objectSet, // so return when err == nil, otherwise pods may get stuck terminating w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path) @@ -263,6 +290,56 @@ func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) { return *addon, nil } +// validateGVKs removes from the Addon status any GVKs that the server does not support +func (w *watcher) validateGVKs(addon *apisv1.Addon) error { + gvks := []schema.GroupVersionKind{} + for _, gvk := range addon.Status.GVKs { + found, err := w.serverHasGVK(gvk) + if err != nil { + return err + } + if found { + gvks = append(gvks, gvk) + } else { + logrus.Warnf("Pruned unknown GVK from %s %s/%s: %s", addon.TypeMeta.GroupVersionKind(), addon.Namespace, addon.Name, gvk) + } + } + addon.Status.GVKs = gvks + return nil +} + +// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some +// point in time. Note this may fail to filter out GVKs that are removed from the cluster after +// startup (for example, if CRDs are deleted) - but the Wrangler DesiredSet cache has the same issue, +// so it should be fine. +func (w *watcher) serverHasGVK(gvk schema.GroupVersionKind) (bool, error) { + w.Lock() + defer w.Unlock() + + if found, ok := w.gvkCache[gvk]; ok { + return found, nil + } + + resources, err := w.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String()) + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + + // Cache all Kinds for this GroupVersion to save on future lookups + for _, resource := range resources.APIResources { + // Resources in the requested GV are returned with empty GroupVersion. + // Subresources with different GV may also be returned, but we aren't interested in those. + if resource.Group == "" && resource.Version == "" { + w.gvkCache[gvk.GroupVersion().WithKind(resource.Kind)] = true + } + } + + return w.gvkCache[gvk], nil +} + // objectSet returns a new ObjectSet containing all resources from a given yaml chunk func objectSet(content []byte) (*objectset.ObjectSet, error) { objs, err := yamlToObjects(bytes.NewBuffer(content))