mirror of https://github.com/k3s-io/k3s
150 lines
3.2 KiB
Go
150 lines
3.2 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
|
|
"github.com/canonical/go-dqlite/client"
|
|
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
|
|
"github.com/sirupsen/logrus"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
)
|
|
|
|
const (
|
|
allKey = "_all_"
|
|
nodeID = "cluster.k3s.cattle.io/node-id"
|
|
nodeAddress = "cluster.k3s.cattle.io/node-address"
|
|
master = "node-role.kubernetes.io/master"
|
|
)
|
|
|
|
func Register(ctx context.Context, nodeName string, nodeInfo client.NodeInfo,
|
|
nodeStore client.NodeStore, nodes controllerv1.NodeController, opts []client.Option) {
|
|
h := &handler{
|
|
nodeStore: nodeStore,
|
|
nodeController: nodes,
|
|
nodeName: nodeName,
|
|
id: strconv.FormatUint(nodeInfo.ID, 10),
|
|
address: nodeInfo.Address,
|
|
ctx: ctx,
|
|
opts: opts,
|
|
}
|
|
nodes.OnChange(ctx, "dqlite-client", h.sync)
|
|
nodes.OnRemove(ctx, "dqlite-client", h.onRemove)
|
|
}
|
|
|
|
type handler struct {
|
|
nodeStore client.NodeStore
|
|
nodeController controllerv1.NodeController
|
|
nodeName string
|
|
id string
|
|
address string
|
|
ctx context.Context
|
|
opts []client.Option
|
|
}
|
|
|
|
func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
|
|
if key == allKey {
|
|
return nil, h.updateNodeStore()
|
|
}
|
|
|
|
if node == nil {
|
|
return nil, nil
|
|
}
|
|
|
|
if key == h.nodeName {
|
|
return h.handleSelf(node)
|
|
}
|
|
|
|
if node.Labels[master] == "true" {
|
|
h.nodeController.Enqueue(allKey)
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
|
|
if node.Annotations[nodeID] == h.id && node.Annotations[nodeAddress] == h.address {
|
|
return node, nil
|
|
}
|
|
|
|
node = node.DeepCopy()
|
|
if node.Annotations == nil {
|
|
node.Annotations = map[string]string{}
|
|
}
|
|
node.Annotations[nodeID] = h.id
|
|
node.Annotations[nodeAddress] = h.address
|
|
|
|
return h.nodeController.Update(node)
|
|
}
|
|
|
|
func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
|
|
address := node.Annotations[nodeAddress]
|
|
if address == "" {
|
|
return node, nil
|
|
}
|
|
return node, h.delete(address)
|
|
}
|
|
|
|
func (h *handler) delete(address string) error {
|
|
c, err := client.FindLeader(h.ctx, h.nodeStore, h.opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer c.Close()
|
|
|
|
members, err := c.Cluster(h.ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, member := range members {
|
|
if member.Address == address {
|
|
logrus.Infof("Removing %s %d from dqlite", member.Address, member.ID)
|
|
return c.Remove(h.ctx, member.ID)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *handler) updateNodeStore() error {
|
|
nodes, err := h.nodeController.Cache().List(labels.SelectorFromSet(labels.Set{
|
|
master: "true",
|
|
}))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var nodeInfos []client.NodeInfo
|
|
for _, node := range nodes {
|
|
address, ok := node.Annotations[nodeAddress]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
nodeIDStr, ok := node.Annotations[nodeID]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
id, err := strconv.ParseUint(nodeIDStr, 10, 64)
|
|
if err != nil {
|
|
logrus.Errorf("invalid %s=%s, must be a number: %v", nodeID, nodeIDStr, err)
|
|
continue
|
|
}
|
|
|
|
nodeInfos = append(nodeInfos, client.NodeInfo{
|
|
ID: id,
|
|
Address: address,
|
|
})
|
|
}
|
|
|
|
if len(nodeInfos) == 0 {
|
|
return fmt.Errorf("not setting dqlient NodeStore len to 0")
|
|
}
|
|
|
|
return h.nodeStore.Set(h.ctx, nodeInfos)
|
|
}
|