mirror of https://github.com/k3s-io/k3s
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
293 lines
6.1 KiB
293 lines
6.1 KiB
package deploy |
|
|
|
import ( |
|
"bufio" |
|
"bytes" |
|
"context" |
|
"crypto/sha256" |
|
"encoding/hex" |
|
"io" |
|
"io/ioutil" |
|
"os" |
|
"path/filepath" |
|
"strings" |
|
"time" |
|
|
|
errors2 "github.com/pkg/errors" |
|
|
|
v1 "github.com/rancher/k3s/types/apis/k3s.cattle.io/v1" |
|
"github.com/rancher/norman" |
|
"github.com/rancher/norman/objectclient" |
|
"github.com/rancher/norman/pkg/objectset" |
|
"github.com/rancher/norman/types" |
|
"github.com/sirupsen/logrus" |
|
"k8s.io/apimachinery/pkg/api/errors" |
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" |
|
"k8s.io/apimachinery/pkg/runtime" |
|
"k8s.io/apimachinery/pkg/runtime/schema" |
|
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml" |
|
"k8s.io/client-go/discovery" |
|
"k8s.io/client-go/rest" |
|
) |
|
|
|
const ( |
|
ns = "kube-system" |
|
startKey = "_start_" |
|
) |
|
|
|
func WatchFiles(ctx context.Context, skips []string, bases ...string) error { |
|
server := norman.GetServer(ctx) |
|
addons := v1.ClientsFrom(ctx).Addon |
|
|
|
w := &watcher{ |
|
addonCache: addons.Cache(), |
|
addons: addons, |
|
skips: skips, |
|
bases: bases, |
|
restConfig: *server.Runtime.LocalConfig, |
|
discovery: server.K8sClient.Discovery(), |
|
clients: map[schema.GroupVersionKind]*objectclient.ObjectClient{}, |
|
} |
|
|
|
addons.Enqueue("", startKey) |
|
addons.Interface().AddHandler(ctx, "addon-start", func(key string, _ *v1.Addon) (runtime.Object, error) { |
|
if key == startKey { |
|
if err := w.listFiles(true); err != nil { |
|
return nil, err |
|
} |
|
w.started = true |
|
return nil, nil |
|
} |
|
return nil, nil |
|
}) |
|
|
|
w.start(ctx) |
|
return nil |
|
} |
|
|
|
type watcher struct { |
|
started bool |
|
addonCache v1.AddonClientCache |
|
addons v1.AddonClient |
|
bases []string |
|
skips []string |
|
restConfig rest.Config |
|
discovery discovery.DiscoveryInterface |
|
clients map[schema.GroupVersionKind]*objectclient.ObjectClient |
|
} |
|
|
|
func (w *watcher) start(ctx context.Context) { |
|
for { |
|
select { |
|
case <-ctx.Done(): |
|
return |
|
case <-time.After(15 * time.Second): |
|
if err := w.listFiles(false); err != nil { |
|
logrus.Errorf("failed to process config: %v", err) |
|
} |
|
} |
|
} |
|
} |
|
|
|
func (w *watcher) listFiles(force bool) error { |
|
var errs []error |
|
for _, base := range w.bases { |
|
if err := w.listFilesIn(base, force); err != nil { |
|
errs = append(errs, err) |
|
} |
|
|
|
} |
|
return types.NewErrors(errs...) |
|
} |
|
|
|
func (w *watcher) listFilesIn(base string, force bool) error { |
|
if !w.started { |
|
return nil |
|
} |
|
|
|
files, err := ioutil.ReadDir(base) |
|
if os.IsNotExist(err) { |
|
return nil |
|
} else if err != nil { |
|
return err |
|
} |
|
|
|
skips := map[string]bool{} |
|
for _, skip := range w.skips { |
|
skips[skip] = true |
|
} |
|
for _, file := range files { |
|
if strings.HasSuffix(file.Name(), ".skip") { |
|
skips[strings.TrimSuffix(file.Name(), ".skip")] = true |
|
} |
|
|
|
} |
|
|
|
var errs []error |
|
for _, file := range files { |
|
if strings.HasSuffix(file.Name(), ".skip") || skips[file.Name()] { |
|
continue |
|
} |
|
p := filepath.Join(base, file.Name()) |
|
if err := w.deploy(p, !force); err != nil { |
|
errs = append(errs, errors2.Wrapf(err, "failed to process %s", p)) |
|
} |
|
} |
|
|
|
return types.NewErrors(errs...) |
|
} |
|
|
|
func (w *watcher) deploy(path string, compareChecksum bool) error { |
|
content, err := ioutil.ReadFile(path) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
name := name(path) |
|
addon, err := w.addon(name) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
checksum := checksum(content) |
|
if compareChecksum && checksum == addon.Spec.Checksum { |
|
logrus.Debugf("Skipping existing deployment of %s, check=%v, checksum %s=%s", path, compareChecksum, checksum, addon.Spec.Checksum) |
|
return nil |
|
} |
|
|
|
objectSet, err := objectSet(content) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
clients, err := w.apply(addon, objectSet) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
if w.clients == nil { |
|
w.clients = map[schema.GroupVersionKind]*objectclient.ObjectClient{} |
|
} |
|
|
|
addon.Spec.Source = path |
|
addon.Spec.Checksum = checksum |
|
addon.Status.GVKs = nil |
|
|
|
for gvk, client := range clients { |
|
addon.Status.GVKs = append(addon.Status.GVKs, gvk) |
|
w.clients[gvk] = client |
|
} |
|
|
|
if addon.UID == "" { |
|
_, err := w.addons.Create(&addon) |
|
return err |
|
} |
|
|
|
_, err = w.addons.Update(&addon) |
|
return err |
|
} |
|
|
|
func (w *watcher) addon(name string) (v1.Addon, error) { |
|
addon, err := w.addonCache.Get(ns, name) |
|
if errors.IsNotFound(err) { |
|
addon = v1.NewAddon(ns, name, v1.Addon{}) |
|
} else if err != nil { |
|
return v1.Addon{}, err |
|
} |
|
return *addon, nil |
|
} |
|
|
|
func (w *watcher) apply(addon v1.Addon, set *objectset.ObjectSet) (map[schema.GroupVersionKind]*objectclient.ObjectClient, error) { |
|
var ( |
|
err error |
|
) |
|
|
|
op := objectset.NewProcessor(addon.Name) |
|
op.AllowDiscovery(w.discovery, w.restConfig) |
|
|
|
ds := op.NewDesiredSet(nil, set) |
|
|
|
for _, gvk := range addon.Status.GVKs { |
|
client, ok := w.clients[gvk] |
|
if !ok { |
|
client, err = objectset.NewDiscoveredClient(gvk, w.restConfig, w.discovery) |
|
if err != nil { |
|
return nil, err |
|
} |
|
} |
|
|
|
ds.AddDiscoveredClient(gvk, client) |
|
} |
|
|
|
if err := ds.Apply(); err != nil { |
|
return nil, err |
|
} |
|
|
|
return ds.DiscoveredClients(), nil |
|
} |
|
|
|
func objectSet(content []byte) (*objectset.ObjectSet, error) { |
|
objs, err := yamlToObjects(bytes.NewBuffer(content)) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
os := objectset.NewObjectSet() |
|
os.Add(objs...) |
|
return os, nil |
|
} |
|
|
|
func name(path string) string { |
|
name := filepath.Base(path) |
|
return strings.SplitN(name, ".", 2)[0] |
|
} |
|
|
|
func checksum(bytes []byte) string { |
|
d := sha256.Sum256(bytes) |
|
return hex.EncodeToString(d[:]) |
|
} |
|
|
|
func yamlToObjects(in io.Reader) ([]runtime.Object, error) { |
|
var result []runtime.Object |
|
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096)) |
|
for { |
|
raw, err := reader.Read() |
|
if err == io.EOF { |
|
break |
|
} |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
obj, err := toObjects(raw) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
result = append(result, obj...) |
|
} |
|
|
|
return result, nil |
|
} |
|
|
|
func toObjects(bytes []byte) ([]runtime.Object, error) { |
|
bytes, err := yamlDecoder.ToJSON(bytes) |
|
if err != nil { |
|
return nil, err |
|
} |
|
obj, _, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
if l, ok := obj.(*unstructured.UnstructuredList); ok { |
|
var result []runtime.Object |
|
for _, obj := range l.Items { |
|
copy := obj |
|
result = append(result, ©) |
|
} |
|
return result, nil |
|
} |
|
|
|
return []runtime.Object{obj}, nil |
|
}
|
|
|