k3s/pkg/dqlite/controller/client/controller.go

182 lines
3.9 KiB
Go

package client
import (
"context"
"fmt"
"strconv"
"github.com/canonical/go-dqlite/client"
"github.com/canonical/go-dqlite/driver"
controllerv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
const (
allKey = "_all_"
nodeID = "cluster.k3s.cattle.io/node-id"
nodeAddress = "cluster.k3s.cattle.io/node-address"
master = "node-role.kubernetes.io/master"
)
func Register(ctx context.Context, nodeName string, nodeInfo client.NodeInfo,
nodeStore client.NodeStore, nodes controllerv1.NodeController, opts []client.Option) {
h := &handler{
nodeStore: nodeStore,
nodeController: nodes,
nodeName: nodeName,
id: strconv.FormatUint(nodeInfo.ID, 10),
address: nodeInfo.Address,
ctx: ctx,
opts: opts,
}
nodes.OnChange(ctx, "dqlite-client", h.sync)
nodes.OnRemove(ctx, "dqlite-client", h.onRemove)
}
type handler struct {
nodeStore client.NodeStore
nodeController controllerv1.NodeController
nodeName string
id string
address string
ctx context.Context
opts []client.Option
}
func (h *handler) sync(key string, node *v1.Node) (*v1.Node, error) {
if key == allKey {
return nil, h.updateNodeStore()
}
if node == nil {
return nil, nil
}
if key == h.nodeName {
return h.handleSelf(node)
}
if node.Labels[master] == "true" {
h.nodeController.Enqueue(allKey)
}
return node, nil
}
func (h *handler) ensureExists(address string) error {
c, err := client.FindLeader(h.ctx, h.nodeStore, h.opts...)
if err == driver.ErrNoAvailableLeader {
logrus.Fatalf("no dqlite leader found: %v", err)
} else if err != nil {
return err
}
defer c.Close()
members, err := c.Cluster(h.ctx)
if err != nil {
return err
}
for _, member := range members {
if member.Address == address {
return nil
}
}
logrus.Fatalf("Address %s is not member of the cluster", address)
return nil
}
func (h *handler) handleSelf(node *v1.Node) (*v1.Node, error) {
if node.Annotations[nodeID] == h.id && node.Annotations[nodeAddress] == h.address {
return node, h.ensureExists(h.address)
}
node = node.DeepCopy()
if node.Annotations == nil {
node.Annotations = map[string]string{}
}
node.Annotations[nodeID] = h.id
node.Annotations[nodeAddress] = h.address
return h.nodeController.Update(node)
}
func (h *handler) onRemove(key string, node *v1.Node) (*v1.Node, error) {
address := node.Annotations[nodeAddress]
if address == "" {
return node, nil
}
return node, h.delete(address)
}
func (h *handler) delete(address string) error {
c, err := client.FindLeader(h.ctx, h.nodeStore, h.opts...)
if err != nil {
return err
}
defer c.Close()
members, err := c.Cluster(h.ctx)
if err != nil {
return err
}
for _, member := range members {
if member.Address == address {
logrus.Infof("Removing %s %d from dqlite", member.Address, member.ID)
return c.Remove(h.ctx, member.ID)
}
}
return nil
}
func (h *handler) updateNodeStore() error {
nodes, err := h.nodeController.Cache().List(labels.SelectorFromSet(labels.Set{
master: "true",
}))
if err != nil {
return err
}
var (
nodeInfos []client.NodeInfo
seen = map[string]bool{}
)
for _, node := range nodes {
address, ok := node.Annotations[nodeAddress]
if !ok {
continue
}
nodeIDStr, ok := node.Annotations[nodeID]
if !ok {
continue
}
id, err := strconv.ParseUint(nodeIDStr, 10, 64)
if err != nil {
logrus.Errorf("invalid %s=%s, must be a number: %v", nodeID, nodeIDStr, err)
continue
}
if !seen[address] {
nodeInfos = append(nodeInfos, client.NodeInfo{
ID: id,
Address: address,
})
seen[address] = true
}
}
if len(nodeInfos) == 0 {
return fmt.Errorf("not setting dqlient NodeStore len to 0")
}
return h.nodeStore.Set(h.ctx, nodeInfos)
}