mirror of https://github.com/k3s-io/k3s
Add GVK lookup to deploy controller
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/6432/head
parent
de32ce1776
commit
0e0d283d08
|
@ -11,6 +11,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/k3s-io/k3s/pkg/agent/util"
|
"github.com/k3s-io/k3s/pkg/agent/util"
|
||||||
|
@ -24,10 +25,13 @@ import (
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
|
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
|
||||||
|
"k8s.io/client-go/discovery"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/client-go/tools/record"
|
"k8s.io/client-go/tools/record"
|
||||||
)
|
)
|
||||||
|
@ -46,6 +50,8 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap
|
||||||
bases: bases,
|
bases: bases,
|
||||||
disables: disables,
|
disables: disables,
|
||||||
modTime: map[string]time.Time{},
|
modTime: map[string]time.Time{},
|
||||||
|
gvkCache: map[schema.GroupVersionKind]bool{},
|
||||||
|
discovery: client.Discovery(),
|
||||||
}
|
}
|
||||||
|
|
||||||
addons.Enqueue(metav1.NamespaceNone, startKey)
|
addons.Enqueue(metav1.NamespaceNone, startKey)
|
||||||
|
@ -60,13 +66,17 @@ func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Ap
|
||||||
}
|
}
|
||||||
|
|
||||||
type watcher struct {
|
type watcher struct {
|
||||||
|
sync.Mutex
|
||||||
|
|
||||||
apply apply.Apply
|
apply apply.Apply
|
||||||
addonCache controllersv1.AddonCache
|
addonCache controllersv1.AddonCache
|
||||||
addons controllersv1.AddonClient
|
addons controllersv1.AddonClient
|
||||||
bases []string
|
bases []string
|
||||||
disables map[string]bool
|
disables map[string]bool
|
||||||
modTime map[string]time.Time
|
modTime map[string]time.Time
|
||||||
|
gvkCache map[schema.GroupVersionKind]bool
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
discovery discovery.DiscoveryInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
|
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
|
||||||
|
@ -195,18 +205,29 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Merge GVK list early for validation
|
||||||
|
addon.Status.GVKs = append(addon.Status.GVKs, objects.GVKs()...)
|
||||||
|
|
||||||
|
// Ensure that we don't try to prune using GVKs that the server doesn't have.
|
||||||
|
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
||||||
|
if err := w.validateGVKs(&addon); err != nil {
|
||||||
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ValidateManifestFailed", "Validate GVKs for manifest at %q failed: %v", path, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
|
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
|
||||||
// existing objects, rejected by validating webhooks, etc.
|
// existing objects, rejected by validating webhooks, etc.
|
||||||
// WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously
|
// WithGVK searches for objects using both GVKs currently listed in the manifest, as well as GVKs previously
|
||||||
// applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply
|
// applied. This ensures that objects don't get orphaned when they are removed from the file - if the apply
|
||||||
// doesn't know to search that GVK for owner references, it won't find and delete them.
|
// doesn't know to search that GVK for owner references, it won't find and delete them.
|
||||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
|
||||||
|
|
||||||
if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).Apply(objects); err != nil {
|
if err := w.apply.WithOwner(&addon).WithGVK(addon.Status.GVKs...).Apply(objects); err != nil {
|
||||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
|
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Emit event, Update Addon checksum only if apply was successful
|
// Emit event, Update Addon checksum and GVKs only if apply was successful
|
||||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
|
||||||
addon.Spec.Checksum = checksum
|
addon.Spec.Checksum = checksum
|
||||||
addon.Status.GVKs = objects.GVKs()
|
addon.Status.GVKs = objects.GVKs()
|
||||||
|
@ -236,6 +257,12 @@ func (w *watcher) delete(path string) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that we don't try to delete using GVKs that the server doesn't have.
|
||||||
|
// This can happen when CRDs are removed or when core types are removed - PodSecurityPolicy, for example.
|
||||||
|
if err := w.validateGVKs(&addon); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// ensure that the addon is completely removed before deleting the objectSet,
|
// ensure that the addon is completely removed before deleting the objectSet,
|
||||||
// so return when err == nil, otherwise pods may get stuck terminating
|
// so return when err == nil, otherwise pods may get stuck terminating
|
||||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
|
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
|
||||||
|
@ -263,6 +290,56 @@ func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
|
||||||
return *addon, nil
|
return *addon, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validateGVKs removes from the Addon status any GVKs that the server does not support
|
||||||
|
func (w *watcher) validateGVKs(addon *apisv1.Addon) error {
|
||||||
|
gvks := []schema.GroupVersionKind{}
|
||||||
|
for _, gvk := range addon.Status.GVKs {
|
||||||
|
found, err := w.serverHasGVK(gvk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if found {
|
||||||
|
gvks = append(gvks, gvk)
|
||||||
|
} else {
|
||||||
|
logrus.Warnf("Pruned unknown GVK from %s %s/%s: %s", addon.TypeMeta.GroupVersionKind(), addon.Namespace, addon.Name, gvk)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
addon.Status.GVKs = gvks
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// serverHasGVK uses a positive cache of GVKs that the cluster is known to have supported at some
|
||||||
|
// point in time. Note this may fail to filter out GVKs that are removed from the cluster after
|
||||||
|
// startup (for example, if CRDs are deleted) - but the Wrangler DesiredSet cache has the same issue,
|
||||||
|
// so it should be fine.
|
||||||
|
func (w *watcher) serverHasGVK(gvk schema.GroupVersionKind) (bool, error) {
|
||||||
|
w.Lock()
|
||||||
|
defer w.Unlock()
|
||||||
|
|
||||||
|
if found, ok := w.gvkCache[gvk]; ok {
|
||||||
|
return found, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
resources, err := w.discovery.ServerResourcesForGroupVersion(gvk.GroupVersion().String())
|
||||||
|
if err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache all Kinds for this GroupVersion to save on future lookups
|
||||||
|
for _, resource := range resources.APIResources {
|
||||||
|
// Resources in the requested GV are returned with empty GroupVersion.
|
||||||
|
// Subresources with different GV may also be returned, but we aren't interested in those.
|
||||||
|
if resource.Group == "" && resource.Version == "" {
|
||||||
|
w.gvkCache[gvk.GroupVersion().WithKind(resource.Kind)] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.gvkCache[gvk], nil
|
||||||
|
}
|
||||||
|
|
||||||
// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
|
// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
|
||||||
func objectSet(content []byte) (*objectset.ObjectSet, error) {
|
func objectSet(content []byte) (*objectset.ObjectSet, error) {
|
||||||
objs, err := yamlToObjects(bytes.NewBuffer(content))
|
objs, err := yamlToObjects(bytes.NewBuffer(content))
|
||||||
|
|
Loading…
Reference in New Issue