mirror of https://github.com/k3s-io/k3s
Emit events for AddOn lifecycle
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>pull/3457/head
parent
ea2cd6d727
commit
a7d1159ba6
|
@ -21,13 +21,18 @@ import (
|
|||
"github.com/rancher/wrangler/pkg/apply"
|
||||
"github.com/rancher/wrangler/pkg/merr"
|
||||
"github.com/rancher/wrangler/pkg/objectset"
|
||||
"github.com/rancher/wrangler/pkg/schemes"
|
||||
"github.com/sirupsen/logrus"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -36,7 +41,7 @@ const (
|
|||
)
|
||||
|
||||
// WatchFiles sets up an OnChange callback to start a periodic goroutine to watch files for changes once the controller has started up.
|
||||
func WatchFiles(ctx context.Context, apply apply.Apply, addons controllersv1.AddonController, disables map[string]bool, bases ...string) error {
|
||||
func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Apply, addons controllersv1.AddonController, disables map[string]bool, bases ...string) error {
|
||||
w := &watcher{
|
||||
apply: apply,
|
||||
addonCache: addons.Cache(),
|
||||
|
@ -49,7 +54,7 @@ func WatchFiles(ctx context.Context, apply apply.Apply, addons controllersv1.Add
|
|||
addons.Enqueue(metav1.NamespaceNone, startKey)
|
||||
addons.OnChange(ctx, "addon-start", func(key string, _ *apisv1.Addon) (*apisv1.Addon, error) {
|
||||
if key == startKey {
|
||||
go w.start(ctx)
|
||||
go w.start(ctx, client)
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
|
@ -64,10 +69,17 @@ type watcher struct {
|
|||
bases []string
|
||||
disables map[string]bool
|
||||
modTime map[string]time.Time
|
||||
recorder record.EventRecorder
|
||||
}
|
||||
|
||||
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
|
||||
func (w *watcher) start(ctx context.Context) {
|
||||
func (w *watcher) start(ctx context.Context, client kubernetes.Interface) {
|
||||
nodeName := os.Getenv("NODE_NAME")
|
||||
broadcaster := record.NewBroadcaster()
|
||||
broadcaster.StartLogging(logrus.Infof)
|
||||
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events(metav1.NamespaceSystem)})
|
||||
w.recorder = broadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ControllerName, Host: nodeName})
|
||||
|
||||
force := true
|
||||
for {
|
||||
if err := w.listFiles(force); err == nil {
|
||||
|
@ -153,14 +165,28 @@ func (w *watcher) listFilesIn(base string, force bool) error {
|
|||
// deploy loads yaml from a manifest on disk, creates an AddOn resource to track its application, and then applies
|
||||
// all resources contained within to the cluster.
|
||||
func (w *watcher) deploy(path string, compareChecksum bool) error {
|
||||
content, err := ioutil.ReadFile(path)
|
||||
name := basename(path)
|
||||
addon, err := w.getOrCreateAddon(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
name := basename(path)
|
||||
addon, err := w.getOrCreateAddon(name)
|
||||
addon.Spec.Source = path
|
||||
addon.Status.GVKs = nil
|
||||
|
||||
// Create the new Addon now so that we can use it to report Events when parsing/applying the manifest
|
||||
// Events need the UID and ObjectRevision set to function properly
|
||||
if addon.UID == "" {
|
||||
newAddon, err := w.addons.Create(&addon)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
addon = *newAddon
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -170,26 +196,24 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Attempt to parse the YAML/JSON into objects. Failure at this point would be due to bad file content - not YAML/JSON,
|
||||
// YAML/JSON that can't be converted to Kubernetes objects, etc.
|
||||
objectSet, err := objectSet(content)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
|
||||
// existing objects, rejected by validating webhooks, etc.
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
|
||||
if err := w.apply.WithOwner(&addon).Apply(objectSet); err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
addon.Spec.Source = path
|
||||
addon.Spec.Checksum = checksum
|
||||
addon.Status.GVKs = nil
|
||||
|
||||
// Create the new Addon now so that we can use it to report Events when parsing/applying the manifest
|
||||
// Events need the UID and ObjectRevision set to function properly
|
||||
if addon.UID == "" {
|
||||
_, err := w.addons.Create(&addon)
|
||||
return err
|
||||
}
|
||||
|
||||
// Emit event, Update Addon checksum and modtime only if apply was successful
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
|
||||
_, err = w.addons.Update(&addon)
|
||||
return err
|
||||
}
|
||||
|
@ -203,24 +227,29 @@ func (w *watcher) delete(path string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// ensure that the addon is completely removed before deleting the objectSet,
|
||||
// so return when err == nil, otherwise pods may get stuck terminating
|
||||
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !errors.IsNotFound(err) {
|
||||
content, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
objectSet, err := objectSet(content)
|
||||
if err != nil {
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
|
||||
return err
|
||||
}
|
||||
var gvk []schema.GroupVersionKind
|
||||
for k := range objectSet.ObjectsByGVK() {
|
||||
gvk = append(gvk, k)
|
||||
}
|
||||
|
||||
// ensure that the addon is completely removed before deleting the objectSet,
|
||||
// so return when err == nil, otherwise pods may get stuck terminating
|
||||
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
|
||||
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !errors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// apply an empty set with owner & gvk data to delete
|
||||
if err := w.apply.WithOwner(&addon).WithGVK(gvk...).Apply(nil); err != nil {
|
||||
return err
|
||||
|
|
|
@ -248,6 +248,7 @@ func stageFiles(ctx context.Context, sc *Context, controlConfig *config.Control)
|
|||
}
|
||||
|
||||
return deploy.WatchFiles(ctx,
|
||||
sc.K8s,
|
||||
sc.Apply,
|
||||
sc.K3s.K3s().V1().Addon(),
|
||||
controlConfig.Disables,
|
||||
|
|
Loading…
Reference in New Issue