mirror of https://github.com/k3s-io/k3s
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
124 lines
3.2 KiB
124 lines
3.2 KiB
package dqlite |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"io/ioutil" |
|
"os" |
|
"path/filepath" |
|
"strings" |
|
"time" |
|
|
|
"github.com/canonical/go-dqlite/client" |
|
"github.com/pkg/errors" |
|
"github.com/sirupsen/logrus" |
|
) |
|
|
|
func (d *DQLite) Test(ctx context.Context) error { |
|
var ips []string |
|
peers, err := d.NodeStore.Get(ctx) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
for _, peer := range peers { |
|
ips = append(ips, peer.Address) |
|
} |
|
|
|
logrus.Infof("Testing connection to peers %v", ips) |
|
if err := d.Join(ctx, nil); err != nil { |
|
return err |
|
} |
|
logrus.Infof("Connection OK to peers %v", ips) |
|
return nil |
|
} |
|
|
|
func nodeIDsEqual(testID, currentID uint64) bool { |
|
// this is a test for a bug in v1.0.0. In future versions we don't |
|
// generate node ID higher than 1<<20 so this doesn't matter. But |
|
// basically just ignore the first 32 bits. |
|
return uint32(testID) == uint32(currentID) |
|
} |
|
|
|
func (d *DQLite) Join(ctx context.Context, nodes []client.NodeInfo) error { |
|
if len(nodes) > 0 { |
|
if err := d.NodeStore.Set(ctx, nodes); err != nil { |
|
return err |
|
} |
|
} |
|
|
|
client, err := client.FindLeader(ctx, d.NodeStore, d.clientOpts...) |
|
if err != nil { |
|
return err |
|
} |
|
defer client.Close() |
|
|
|
current, err := client.Cluster(ctx) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
nodeID, err := getClusterID(false, d.DataDir) |
|
if err != nil { |
|
return errors.Wrap(err, "get cluster ID") |
|
} |
|
for _, testNode := range current { |
|
if testNode.Address == d.NodeInfo.Address { |
|
if !nodeIDsEqual(testNode.ID, nodeID) { |
|
if err := d.node.Close(); err != nil { |
|
return errors.Wrap(err, "node close for id reset") |
|
} |
|
if err := writeClusterID(testNode.ID, d.DataDir); err != nil { |
|
return errors.Wrap(err, "restart node to reset ID") |
|
} |
|
logrus.Fatalf("resetting node ID from %d to %d, please restart", nodeID, testNode.ID) |
|
} |
|
return nil |
|
} else if nodeIDsEqual(testNode.ID, nodeID) { |
|
deleteClusterID(d.DataDir) |
|
logrus.Fatalf("node ID %d is in use, please restart", nodeID) |
|
} |
|
} |
|
|
|
if found, err := cleanDir(d.DataDir, true); err != nil { |
|
return err |
|
} else if found { |
|
if err := d.node.Close(); err != nil { |
|
return errors.Wrap(err, "node close for cleaning") |
|
} |
|
_, _ = cleanDir(d.DataDir, false) |
|
return fmt.Errorf("cleaned DB directory, now restart and join") |
|
} |
|
|
|
logrus.Infof("Joining dqlite cluster as address=%s, id=%d", d.NodeInfo.Address, d.NodeInfo.ID) |
|
return client.Add(ctx, d.NodeInfo) |
|
} |
|
|
|
func cleanDir(dataDir string, check bool) (bool, error) { |
|
dbDir := GetDBDir(dataDir) |
|
backupDir := filepath.Join(dbDir, fmt.Sprintf(".backup-%d", time.Now().Unix())) |
|
files, err := ioutil.ReadDir(dbDir) |
|
if err != nil { |
|
return false, errors.Wrap(err, "cleaning dqlite DB dir") |
|
} |
|
|
|
for _, file := range files { |
|
if file.IsDir() || strings.HasPrefix(file.Name(), ".") || ignoreFile[file.Name()] { |
|
continue |
|
} |
|
if check { |
|
return true, nil |
|
} |
|
if err := os.MkdirAll(backupDir, 0700); err != nil { |
|
return false, errors.Wrapf(err, "creating backup dir %s", backupDir) |
|
} |
|
oldName := filepath.Join(dbDir, file.Name()) |
|
newName := filepath.Join(backupDir, file.Name()) |
|
logrus.Infof("Backing up %s => %s", oldName, newName) |
|
if err := os.Rename(oldName, newName); err != nil { |
|
return false, errors.Wrapf(err, "backup %s", oldName) |
|
} |
|
} |
|
|
|
return false, nil |
|
}
|
|
|