[engine-1.21] Add etcd extra args support for K3s (#4470)

* Add etcd extra args support for K3s

Signed-off-by: Chris Kim <oats87g@gmail.com>

* Add etcd custom argument integration test

Signed-off-by: Chris Kim <oats87g@gmail.com>

* Redux: Enable K3s integration test to run on existing cluster (#3905)

* Made it possible to run int tests on existing cluster

Signed-off-by: dereknola <derek.nola@suse.com>

Signed-off-by: Chris Kim <oats87g@gmail.com>

Co-authored-by: Derek Nola <derek.nola@suse.com>
pull/4477/head^2
Chris Kim 2021-11-11 19:53:20 -08:00 committed by GitHub
parent 8f82ae0749
commit 381d086cf0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 217 additions and 33 deletions

View File

@ -48,6 +48,7 @@ type Server struct {
TLSSan cli.StringSlice TLSSan cli.StringSlice
BindAddress string BindAddress string
ExtraAPIArgs cli.StringSlice ExtraAPIArgs cli.StringSlice
ExtraEtcdArgs cli.StringSlice
ExtraSchedulerArgs cli.StringSlice ExtraSchedulerArgs cli.StringSlice
ExtraControllerArgs cli.StringSlice ExtraControllerArgs cli.StringSlice
ExtraCloudControllerArgs cli.StringSlice ExtraCloudControllerArgs cli.StringSlice
@ -128,6 +129,11 @@ var (
Usage: "(flags) Customized flag for kube-apiserver process", Usage: "(flags) Customized flag for kube-apiserver process",
Value: &ServerConfig.ExtraAPIArgs, Value: &ServerConfig.ExtraAPIArgs,
} }
ExtraEtcdArgs = cli.StringSliceFlag{
Name: "etcd-arg",
Usage: "(flags) Customized flag for etcd process",
Value: &ServerConfig.ExtraEtcdArgs,
}
ExtraSchedulerArgs = cli.StringSliceFlag{ ExtraSchedulerArgs = cli.StringSliceFlag{
Name: "kube-scheduler-arg", Name: "kube-scheduler-arg",
Usage: "(flags) Customized flag for kube-scheduler process", Usage: "(flags) Customized flag for kube-scheduler process",
@ -221,6 +227,7 @@ func NewServerCommand(action func(*cli.Context) error) cli.Command {
EnvVar: version.ProgramUpper + "_KUBECONFIG_MODE", EnvVar: version.ProgramUpper + "_KUBECONFIG_MODE",
}, },
ExtraAPIArgs, ExtraAPIArgs,
ExtraEtcdArgs,
ExtraControllerArgs, ExtraControllerArgs,
ExtraSchedulerArgs, ExtraSchedulerArgs,
cli.StringSliceFlag{ cli.StringSliceFlag{

View File

@ -117,6 +117,7 @@ func run(app *cli.Context, cfg *cmds.Server, leaderControllers server.CustomCont
serverConfig.ControlConfig.APIServerBindAddress = cfg.APIServerBindAddress serverConfig.ControlConfig.APIServerBindAddress = cfg.APIServerBindAddress
serverConfig.ControlConfig.ExtraAPIArgs = cfg.ExtraAPIArgs serverConfig.ControlConfig.ExtraAPIArgs = cfg.ExtraAPIArgs
serverConfig.ControlConfig.ExtraControllerArgs = cfg.ExtraControllerArgs serverConfig.ControlConfig.ExtraControllerArgs = cfg.ExtraControllerArgs
serverConfig.ControlConfig.ExtraEtcdArgs = cfg.ExtraEtcdArgs
serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs serverConfig.ControlConfig.ExtraSchedulerAPIArgs = cfg.ExtraSchedulerArgs
serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain serverConfig.ControlConfig.ClusterDomain = cfg.ClusterDomain
serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint serverConfig.ControlConfig.Datastore.Endpoint = cfg.DatastoreEndpoint

View File

@ -74,7 +74,7 @@ func (c *Cluster) Bootstrap(ctx context.Context, snapshot bool) error {
ElectionTimeout: 5000, ElectionTimeout: 5000,
LogOutputs: []string{"stderr"}, LogOutputs: []string{"stderr"},
} }
configFile, err := args.ToConfigFile() configFile, err := args.ToConfigFile(c.config.ExtraEtcdArgs)
if err != nil { if err != nil {
return err return err
} }

View File

@ -131,6 +131,7 @@ type Control struct {
ExtraAPIArgs []string ExtraAPIArgs []string
ExtraControllerArgs []string ExtraControllerArgs []string
ExtraCloudControllerArgs []string ExtraCloudControllerArgs []string
ExtraEtcdArgs []string
ExtraSchedulerAPIArgs []string ExtraSchedulerAPIArgs []string
NoLeaderElect bool NoLeaderElect bool
JoinURL string JoinURL string

View File

@ -19,8 +19,8 @@ func (e Embedded) CurrentETCDOptions() (InitialOptions, error) {
return InitialOptions{}, nil return InitialOptions{}, nil
} }
func (e Embedded) ETCD(ctx context.Context, args ETCDConfig) error { func (e Embedded) ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
configFile, err := args.ToConfigFile() configFile, err := args.ToConfigFile(extraArgs)
if err != nil { if err != nil {
return err return err
} }

View File

@ -6,12 +6,15 @@ import (
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
"strconv"
"sigs.k8s.io/yaml" "strings"
"time"
"github.com/rancher/k3s/pkg/cli/cmds" "github.com/rancher/k3s/pkg/cli/cmds"
daemonconfig "github.com/rancher/k3s/pkg/daemons/config" daemonconfig "github.com/rancher/k3s/pkg/daemons/config"
yaml2 "gopkg.in/yaml.v2"
"k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authentication/authenticator"
"sigs.k8s.io/yaml"
) )
var ( var (
@ -27,7 +30,7 @@ type Executor interface {
Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error Scheduler(ctx context.Context, apiReady <-chan struct{}, args []string) error
ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error ControllerManager(ctx context.Context, apiReady <-chan struct{}, args []string) error
CurrentETCDOptions() (InitialOptions, error) CurrentETCDOptions() (InitialOptions, error)
ETCD(ctx context.Context, args ETCDConfig) error ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error
CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error
} }
@ -69,13 +72,53 @@ type InitialOptions struct {
State string `json:"initial-cluster-state,omitempty"` State string `json:"initial-cluster-state,omitempty"`
} }
func (e ETCDConfig) ToConfigFile() (string, error) { func (e ETCDConfig) ToConfigFile(extraArgs []string) (string, error) {
confFile := filepath.Join(e.DataDir, "config") confFile := filepath.Join(e.DataDir, "config")
bytes, err := yaml.Marshal(&e) bytes, err := yaml.Marshal(&e)
if err != nil { if err != nil {
return "", err return "", err
} }
if len(extraArgs) > 0 {
var s map[string]interface{}
if err := yaml2.Unmarshal(bytes, &s); err != nil {
return "", err
}
for _, v := range extraArgs {
extraArg := strings.SplitN(v, "=", 2)
// Depending on the argV, we have different types to handle.
// Source: https://github.com/etcd-io/etcd/blob/44b8ae145b505811775f5af915dd19198d556d55/server/config/config.go#L36-L190 and https://etcd.io/docs/v3.5/op-guide/configuration/#configuration-file
if len(extraArg) == 2 {
key := strings.TrimLeft(extraArg[0], "-")
lowerKey := strings.ToLower(key)
var stringArr []string
if i, err := strconv.Atoi(extraArg[1]); err == nil {
s[key] = i
} else if time, err := time.ParseDuration(extraArg[1]); err == nil && (strings.Contains(lowerKey, "time") || strings.Contains(lowerKey, "duration") || strings.Contains(lowerKey, "interval") || strings.Contains(lowerKey, "retention")) {
// auto-compaction-retention is either a time.Duration or int, depending on version. If it is an int, it will be caught above.
s[key] = time
} else if err := yaml.Unmarshal([]byte(extraArg[1]), &stringArr); err == nil {
s[key] = stringArr
} else {
switch strings.ToLower(extraArg[1]) {
case "true":
s[key] = true
case "false":
s[key] = false
default:
s[key] = extraArg[1]
}
}
}
}
bytes, err = yaml2.Marshal(&s)
if err != nil {
return "", err
}
}
if err := os.MkdirAll(e.DataDir, 0700); err != nil { if err := os.MkdirAll(e.DataDir, 0700); err != nil {
return "", err return "", err
} }
@ -118,8 +161,8 @@ func CurrentETCDOptions() (InitialOptions, error) {
return executor.CurrentETCDOptions() return executor.CurrentETCDOptions()
} }
func ETCD(ctx context.Context, args ETCDConfig) error { func ETCD(ctx context.Context, args ETCDConfig, extraArgs []string) error {
return executor.ETCD(ctx, args) return executor.ETCD(ctx, args, extraArgs)
} }
func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error { func CloudControllerManager(ctx context.Context, ccmRBACReady <-chan struct{}, args []string) error {

View File

@ -605,7 +605,7 @@ func (e *ETCD) cluster(ctx context.Context, forceNew bool, options executor.Init
HeartbeatInterval: 500, HeartbeatInterval: 500,
Logger: "zap", Logger: "zap",
LogOutputs: []string{"stderr"}, LogOutputs: []string{"stderr"},
}) }, e.config.ExtraEtcdArgs)
} }
// RemovePeer removes a peer from the cluster. The peer name and IP address must both match. // RemovePeer removes a peer from the cluster. The peer name and IP address must both match.

View File

@ -12,13 +12,21 @@ import (
) )
var server *testutil.K3sServer var server *testutil.K3sServer
var serverArgs = []string{"--cluster-init"}
var _ = BeforeSuite(func() { var _ = BeforeSuite(func() {
if !testutil.IsExistingServer() {
var err error var err error
server, err = testutil.K3sStartServer("--cluster-init") server, err = testutil.K3sStartServer(serverArgs...)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}
}) })
var _ = Describe("etcd snapshots", func() { var _ = Describe("etcd snapshots", func() {
BeforeEach(func() {
if testutil.IsExistingServer() && !testutil.ServerArgsPresent(serverArgs) {
Skip("Test needs k3s server with: " + strings.Join(serverArgs, " "))
}
})
When("a new etcd is created", func() { When("a new etcd is created", func() {
It("starts up with no problems", func() { It("starts up with no problems", func() {
Eventually(func() (string, error) { Eventually(func() (string, error) {
@ -44,9 +52,6 @@ var _ = Describe("etcd snapshots", func() {
}) })
}) })
When("saving a custom name", func() { When("saving a custom name", func() {
It("starts with no snapshots", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "ls")).To(BeEmpty())
})
It("saves an etcd snapshot with a custom name", func() { It("saves an etcd snapshot with a custom name", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "save", "--name", "ALIVEBEEF")). Expect(testutil.K3sCmd("etcd-snapshot", "save", "--name", "ALIVEBEEF")).
To(ContainSubstring("Saving etcd snapshot to /var/lib/rancher/k3s/server/db/snapshots/ALIVEBEEF")) To(ContainSubstring("Saving etcd snapshot to /var/lib/rancher/k3s/server/db/snapshots/ALIVEBEEF"))
@ -62,9 +67,6 @@ var _ = Describe("etcd snapshots", func() {
}) })
}) })
When("using etcd snapshot prune", func() { When("using etcd snapshot prune", func() {
It("starts with no snapshots", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "ls")).To(BeEmpty())
})
It("saves 3 different snapshots", func() { It("saves 3 different snapshots", func() {
Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")). Expect(testutil.K3sCmd("etcd-snapshot", "save", "-name", "PRUNE_TEST")).
To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots")) To(ContainSubstring("Saving current etcd snapshot set to k3s-etcd-snapshots"))
@ -79,10 +81,9 @@ var _ = Describe("etcd snapshots", func() {
It("lists all 3 snapshots", func() { It("lists all 3 snapshots", func() {
lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls") lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sepLines := strings.FieldsFunc(lsResult, func(c rune) bool { reg, err := regexp.Compile(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`)
return c == '\n' Expect(err).ToNot(HaveOccurred())
}) sepLines := reg.FindAllString(lsResult, -1)
Expect(lsResult).To(MatchRegexp(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`))
Expect(sepLines).To(HaveLen(3)) Expect(sepLines).To(HaveLen(3))
}) })
It("prunes snapshots down to 2", func() { It("prunes snapshots down to 2", func() {
@ -90,10 +91,9 @@ var _ = Describe("etcd snapshots", func() {
To(BeEmpty()) To(BeEmpty())
lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls") lsResult, err := testutil.K3sCmd("etcd-snapshot", "ls")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
sepLines := strings.FieldsFunc(lsResult, func(c rune) bool { reg, err := regexp.Compile(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`)
return c == '\n' Expect(err).ToNot(HaveOccurred())
}) sepLines := reg.FindAllString(lsResult, -1)
Expect(lsResult).To(MatchRegexp(`:///var/lib/rancher/k3s/server/db/snapshots/PRUNE_TEST`))
Expect(sepLines).To(HaveLen(2)) Expect(sepLines).To(HaveLen(2))
}) })
It("cleans up remaining snapshots", func() { It("cleans up remaining snapshots", func() {
@ -110,7 +110,9 @@ var _ = Describe("etcd snapshots", func() {
}) })
var _ = AfterSuite(func() { var _ = AfterSuite(func() {
if !testutil.IsExistingServer() {
Expect(testutil.K3sKillServer(server)).To(Succeed()) Expect(testutil.K3sKillServer(server)).To(Succeed())
}
}) })
func Test_IntegrationEtcd(t *testing.T) { func Test_IntegrationEtcd(t *testing.T) {

View File

@ -71,10 +71,16 @@ See the [local storage test](https://github.com/k3s-io/k3s/blob/master/tests/int
### Running ### Running
Integration tests can be run with no k3s cluster present, each test will spin up and kill the appropriate k3s server it needs.
```bash ```bash
go test ./pkg/... ./tests/... -run Integration go test ./pkg/... ./tests/... -run Integration
``` ```
Integration tests can also be run on an existing single-node cluster via compile time flag, tests will skip if the server is not configured correctly.
```
go test -ldflags "-X 'github.com/rancher/k3s/tests/util.existingServer=True'" ./pkg/... ./tests/... -run Integration
```
___ ___
## End-to-End (E2E) Tests ## End-to-End (E2E) Tests

View File

@ -0,0 +1,63 @@
package integration
import (
"os/exec"
"strings"
"testing"
. "github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/reporters"
. "github.com/onsi/gomega"
testutil "github.com/rancher/k3s/tests/util"
)
var customEtcdArgsServer *testutil.K3sServer
var customEtcdArgsServerArgs = []string{
"--cluster-init",
"--etcd-arg quota-backend-bytes=858993459",
}
var _ = BeforeSuite(func() {
if !testutil.IsExistingServer() {
var err error
customEtcdArgsServer, err = testutil.K3sStartServer(customEtcdArgsServerArgs...)
Expect(err).ToNot(HaveOccurred())
}
})
var _ = Describe("custom etcd args", func() {
BeforeEach(func() {
if testutil.IsExistingServer() && !testutil.ServerArgsPresent(customEtcdArgsServerArgs) {
Skip("Test needs k3s server with: " + strings.Join(customEtcdArgsServerArgs, " "))
}
})
When("a custom quota backend bytes is specified", func() {
It("renders a config file with the correct entry", func() {
Eventually(func() (string, error) {
var cmd *exec.Cmd
grepCmd := "grep"
grepCmdArgs := []string{"quota-backend-bytes", "/var/lib/rancher/k3s/server/db/etcd/config"}
if testutil.IsRoot() {
cmd = exec.Command(grepCmd, grepCmdArgs...)
} else {
fullGrepCmd := append([]string{grepCmd}, grepCmdArgs...)
cmd = exec.Command("sudo", fullGrepCmd...)
}
byteOut, err := cmd.CombinedOutput()
return string(byteOut), err
}, "45s", "5s").Should(MatchRegexp(".*quota-backend-bytes: 858993459.*"))
})
})
})
var _ = AfterSuite(func() {
if !testutil.IsExistingServer() {
Expect(testutil.K3sKillServer(customEtcdArgsServer)).To(Succeed())
}
})
func Test_IntegrationCustomEtcdArgs(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecsWithDefaultAndCustomReporters(t, "Custom etcd Arguments", []Reporter{
reporters.NewJUnitReporter("/tmp/results/junit-ls.xml"),
})
}

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"os" "os"
"regexp" "regexp"
"strings"
"testing" "testing"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -12,13 +13,21 @@ import (
) )
var server *testutil.K3sServer var server *testutil.K3sServer
var serverArgs = []string{"--cluster-init"}
var _ = BeforeSuite(func() { var _ = BeforeSuite(func() {
if !testutil.IsExistingServer() {
var err error var err error
server, err = testutil.K3sStartServer("--cluster-init") server, err = testutil.K3sStartServer(serverArgs...)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
}
}) })
var _ = Describe("local storage", func() { var _ = Describe("local storage", func() {
BeforeEach(func() {
if testutil.IsExistingServer() && !testutil.ServerArgsPresent(serverArgs) {
Skip("Test needs k3s server with: " + strings.Join(serverArgs, " "))
}
})
When("a new local storage is created", func() { When("a new local storage is created", func() {
It("starts up with no problems", func() { It("starts up with no problems", func() {
Eventually(func() (string, error) { Eventually(func() (string, error) {
@ -66,11 +75,12 @@ var _ = Describe("local storage", func() {
}) })
var _ = AfterSuite(func() { var _ = AfterSuite(func() {
if !testutil.IsExistingServer() {
Expect(testutil.K3sKillServer(server)).To(Succeed()) Expect(testutil.K3sKillServer(server)).To(Succeed())
}
}) })
func Test_IntegrationLocalStorage(t *testing.T) { func Test_IntegrationLocalStorage(t *testing.T) {
RegisterFailHandler(Fail) RegisterFailHandler(Fail)
RunSpecs(t, "Local Storage Suite") RunSpecs(t, "Local Storage Suite")
} }

View File

@ -2,6 +2,7 @@ package util
import ( import (
"bufio" "bufio"
"encoding/json"
"os" "os"
"os/exec" "os/exec"
"os/user" "os/user"
@ -11,7 +12,18 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// Compile-time variable
var existingServer = "False"
func findK3sExecutable() string { func findK3sExecutable() string {
// if running on an existing cluster, it maybe installed via k3s.service
// or run manually from dist/artifacts/k3s
if IsExistingServer() {
k3sBin, err := exec.LookPath("k3s")
if err == nil {
return k3sBin
}
}
k3sBin := "dist/artifacts/k3s" k3sBin := "dist/artifacts/k3s"
for { for {
_, err := os.Stat(k3sBin) _, err := os.Stat(k3sBin)
@ -33,6 +45,10 @@ func IsRoot() bool {
return currentUser.Uid == "0" return currentUser.Uid == "0"
} }
func IsExistingServer() bool {
return existingServer == "True"
}
// K3sCmd launches the provided K3s command via exec. Command blocks until finished. // K3sCmd launches the provided K3s command via exec. Command blocks until finished.
// Command output from both Stderr and Stdout is provided via string. // Command output from both Stderr and Stdout is provided via string.
// cmdEx1, err := K3sCmd("etcd-snapshot", "ls") // cmdEx1, err := K3sCmd("etcd-snapshot", "ls")
@ -52,6 +68,41 @@ func K3sCmd(cmdName string, cmdArgs ...string) (string, error) {
return string(byteOut), err return string(byteOut), err
} }
func contains(source []string, target string) bool {
for _, s := range source {
if s == target {
return true
}
}
return false
}
// ServerArgsPresent checks if the given arguments are found in the running k3s server
func ServerArgsPresent(neededArgs []string) bool {
currentArgs := K3sServerArgs()
for _, arg := range neededArgs {
if !contains(currentArgs, arg) {
return false
}
}
return true
}
// K3sServerArgs returns the list of arguments that the k3s server launched with
func K3sServerArgs() []string {
results, err := K3sCmd("kubectl", "get", "nodes", "-o", `jsonpath='{.items[0].metadata.annotations.k3s\.io/node-args}'`)
if err != nil {
return nil
}
res := strings.ReplaceAll(results, "'", "")
var args []string
if err := json.Unmarshal([]byte(res), &args); err != nil {
logrus.Error(err)
return nil
}
return args
}
func FindStringInCmdAsync(scanner *bufio.Scanner, target string) bool { func FindStringInCmdAsync(scanner *bufio.Scanner, target string) bool {
for scanner.Scan() { for scanner.Scan() {
if strings.Contains(scanner.Text(), target) { if strings.Contains(scanner.Text(), target) {