From 45ad69765e8a6c11aa704a64dffd8b97b7b4f24f Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Sun, 5 Nov 2017 19:23:29 +0800 Subject: [PATCH 1/4] wrapper ipset util --- pkg/util/BUILD | 1 + pkg/util/ipset/BUILD | 41 +++ pkg/util/ipset/ipset.go | 325 ++++++++++++++++++++++ pkg/util/ipset/ipset_test.go | 483 +++++++++++++++++++++++++++++++++ pkg/util/ipset/testing/BUILD | 23 ++ pkg/util/ipset/testing/fake.go | 83 ++++++ pkg/util/ipset/types.go | 70 +++++ 7 files changed, 1026 insertions(+) create mode 100644 pkg/util/ipset/BUILD create mode 100644 pkg/util/ipset/ipset.go create mode 100644 pkg/util/ipset/ipset_test.go create mode 100644 pkg/util/ipset/testing/BUILD create mode 100644 pkg/util/ipset/testing/fake.go create mode 100644 pkg/util/ipset/types.go diff --git a/pkg/util/BUILD b/pkg/util/BUILD index a206466463..d724c38842 100644 --- a/pkg/util/BUILD +++ b/pkg/util/BUILD @@ -27,6 +27,7 @@ filegroup( "//pkg/util/interrupt:all-srcs", "//pkg/util/io:all-srcs", "//pkg/util/ipconfig:all-srcs", + "//pkg/util/ipset:all-srcs", "//pkg/util/iptables:all-srcs", "//pkg/util/ipvs:all-srcs", "//pkg/util/keymutex:all-srcs", diff --git a/pkg/util/ipset/BUILD b/pkg/util/ipset/BUILD new file mode 100644 index 0000000000..20f172c010 --- /dev/null +++ b/pkg/util/ipset/BUILD @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "ipset.go", + "types.go", + ], + importpath = "k8s.io/kubernetes/pkg/util/ipset", + visibility = ["//visibility:public"], + deps = ["//vendor/k8s.io/utils/exec:go_default_library"], +) + +go_test( + name = "go_default_test", + srcs = ["ipset_test.go"], + importpath = "k8s.io/kubernetes/pkg/util/ipset", + library = ":go_default_library", + deps = [ + "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/utils/exec:go_default_library", + "//vendor/k8s.io/utils/exec/testing:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [ + ":package-srcs", + "//pkg/util/ipset/testing:all-srcs", + ], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/util/ipset/ipset.go b/pkg/util/ipset/ipset.go new file mode 100644 index 0000000000..ba92bff74b --- /dev/null +++ b/pkg/util/ipset/ipset.go @@ -0,0 +1,325 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipset + +import ( + "bytes" + "fmt" + "regexp" + "strconv" + "strings" + + utilexec "k8s.io/utils/exec" +) + +// Interface is an injectable interface for running ipset commands. Implementations must be goroutine-safe. +type Interface interface { + // FlushSet deletes all entries from a named set. + FlushSet(set string) error + // DestroySet deletes a named set. + DestroySet(set string) error + // DestroyAllSets deletes all sets. + DestroyAllSets() error + // CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true. + CreateSet(set *IPSet, ignoreExistErr bool) error + // AddEntry adds a new entry to the named set. + AddEntry(entry string, set string, ignoreExistErr bool) error + // DelEntry deletes one entry from the named set + DelEntry(entry string, set string) error + // Test test if an entry exists in the named set + TestEntry(entry string, set string) (bool, error) + // ListEntries lists all the entries from a named set + ListEntries(set string) ([]string, error) + // ListSets list all set names from kernel + ListSets() ([]string, error) + // GetVersion returns the "X.Y" version string for ipset. + GetVersion() (string, error) +} + +// IPSetCmd represents the ipset util. We use ipset command for ipset execute. +const IPSetCmd = "ipset" + +// EntryMemberPattern is the regular expression pattern of ipset member list. +// The raw output of ipset command `ipset list {set}` is similar to, +//Name: foobar +//Type: hash:ip,port +//Revision: 2 +//Header: family inet hashsize 1024 maxelem 65536 +//Size in memory: 16592 +//References: 0 +//Members: +//192.168.1.2,tcp:8080 +//192.168.1.1,udp:53 +var EntryMemberPattern = "(?m)^(.*\n)*Members:\n" + +// VersionPattern is the regular expression pattern of ipset version string. +// ipset version output is similar to "v6.10". +var VersionPattern = "v[0-9]+\\.[0-9]+" + +// IPSet implements an Interface to an set. +type IPSet struct { + // Name is the set name. + Name string + // SetType specifies the ipset type. + SetType Type + // HashFamily specifies the protocol family of the IP addresses to be stored in the set. + // The default is inet, i.e IPv4. If users want to use IPv6, they should specify inet6. + HashFamily string + // HashSize specifies the hash table size of ipset. + HashSize int + // MaxElem specifies the max element number of ipset. + MaxElem int + // PortRange specifies the port range of bitmap:port type ipset. + PortRange string +} + +// Entry represents a ipset entry. +type Entry struct { + // IP is the entry's IP. The IP address protocol corresponds to the HashFamily of IPSet. + // All entries' IP addresses in the same ip set has same the protocol, IPv4 or IPv6. + IP string + // Port is the entry's Port. + Port int + // Protocol is the entry's Protocol. The protocols of entries in the same ip set are all + // the same. The accepted protocols are TCP and UDP. + Protocol string + // Net is the entry's IP network address. Network address with zero prefix size can NOT + // be stored. + Net string + // IP2 is the entry's second IP. IP2 may not be empty for `hash:ip,port,ip` type ip set. + IP2 string + // SetType specifies the type of ip set where the entry exists. + SetType Type +} + +func (e *Entry) String() string { + switch e.SetType { + case HashIPPort: + // Entry{192.168.1.1, udp, 53} -> 192.168.1.1,udp:53 + // Entry{192.168.1.2, tcp, 8080} -> 192.168.1.2,tcp:8080 + return fmt.Sprintf("%s,%s:%s", e.IP, e.Protocol, strconv.Itoa(e.Port)) + case HashIPPortIP: + // Entry{192.168.1.1, udp, 53, 10.0.0.1} -> 192.168.1.1,udp:53,10.0.0.1 + // Entry{192.168.1.2, tcp, 8080, 192.168.1.2} -> 192.168.1.2,tcp:8080,192.168.1.2 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.IP2) + case HashIPPortNet: + // Entry{192.168.1.2, udp, 80, 10.0.1.0/24} -> 192.168.1.2,udp:80,10.0.1.0/24 + // Entry{192.168.2,25, tcp, 8080, 10.1.0.0/16} -> 192.168.2,25,tcp:8080,10.1.0.0/16 + return fmt.Sprintf("%s,%s:%s,%s", e.IP, e.Protocol, strconv.Itoa(e.Port), e.Net) + case BitmapPort: + // Entry{53} -> 53 + // Entry{8080} -> 8080 + return strconv.Itoa(e.Port) + } + return "" +} + +type runner struct { + exec utilexec.Interface +} + +// New returns a new Interface which will exec ipset. +func New(exec utilexec.Interface) Interface { + return &runner{ + exec: exec, + } +} + +// CreateSet creates a new set, it will ignore error when the set already exists if ignoreExistErr=true. +func (runner *runner) CreateSet(set *IPSet, ignoreExistErr bool) error { + // Using default values. + if set.HashSize == 0 { + set.HashSize = 1024 + } + if set.MaxElem == 0 { + set.MaxElem = 65536 + } + if set.HashFamily == "" { + set.HashFamily = ProtocolFamilyIPV4 + } + if len(set.HashFamily) != 0 && set.HashFamily != ProtocolFamilyIPV4 && set.HashFamily != ProtocolFamilyIPV6 { + return fmt.Errorf("Currently supported protocol families are: %s and %s, %s is not supported", ProtocolFamilyIPV4, ProtocolFamilyIPV6, set.HashFamily) + } + // Default ipset type is "hash:ip,port" + if len(set.SetType) == 0 { + set.SetType = HashIPPort + } + // Check if setType is supported + if !IsValidIPSetType(set.SetType) { + return fmt.Errorf("Currently supported ipset types are: %v, %s is not supported", ValidIPSetTypes, set.SetType) + } + + return runner.createSet(set, ignoreExistErr) +} + +// If ignoreExistErr is set to true, then the -exist option of ipset will be specified, ipset ignores the error +// otherwise raised when the same set (setname and create parameters are identical) already exists. +func (runner *runner) createSet(set *IPSet, ignoreExistErr bool) error { + args := []string{"create", set.Name, string(set.SetType)} + if set.SetType == HashIPPortIP || set.SetType == HashIPPort { + args = append(args, + "family", set.HashFamily, + "hashsize", strconv.Itoa(set.HashSize), + "maxelem", strconv.Itoa(set.MaxElem), + ) + } + if set.SetType == BitmapPort { + if len(set.PortRange) == 0 { + set.PortRange = DefaultPortRange + } + if !validatePortRange(set.PortRange) { + return fmt.Errorf("invalid port range for %s type ip set: %s, expect: a-b", BitmapPort, set.PortRange) + } + args = append(args, "range", set.PortRange) + } + if ignoreExistErr { + args = append(args, "-exist") + } + if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error creating ipset %s, error: %v", set.Name, err) + } + return nil +} + +// AddEntry adds a new entry to the named set. +// If the -exist option is specified, ipset ignores the error otherwise raised when +// the same set (setname and create parameters are identical) already exists. +func (runner *runner) AddEntry(entry string, set string, ignoreExistErr bool) error { + args := []string{"add", set, entry} + if ignoreExistErr { + args = append(args, "-exist") + } + if _, err := runner.exec.Command(IPSetCmd, args...).CombinedOutput(); err != nil { + return fmt.Errorf("error adding entry %s, error: %v", entry, err) + } + return nil +} + +// DelEntry is used to delete the specified entry from the set. +func (runner *runner) DelEntry(entry string, set string) error { + if _, err := runner.exec.Command(IPSetCmd, "del", set, entry).CombinedOutput(); err != nil { + return fmt.Errorf("error deleting entry %s: from set: %s, error: %v", entry, set, err) + } + return nil +} + +// TestEntry is used to check whether the specified entry is in the set or not. +func (runner *runner) TestEntry(entry string, set string) (bool, error) { + if out, err := runner.exec.Command(IPSetCmd, "test", set, entry).CombinedOutput(); err == nil { + reg, e := regexp.Compile("NOT") + if e == nil && reg.MatchString(string(out)) { + return false, nil + } else if e == nil { + return true, nil + } else { + return false, fmt.Errorf("error testing entry: %s, error: %v", entry, e) + } + } else { + return false, fmt.Errorf("error testing entry %s: %v (%s)", entry, err, out) + } +} + +// FlushSet deletes all entries from a named set. +func (runner *runner) FlushSet(set string) error { + if _, err := runner.exec.Command(IPSetCmd, "flush", set).CombinedOutput(); err != nil { + return fmt.Errorf("error flushing set: %s, error: %v", set, err) + } + return nil +} + +// DestroySet is used to destroy a named set. +func (runner *runner) DestroySet(set string) error { + if _, err := runner.exec.Command(IPSetCmd, "destroy", set).CombinedOutput(); err != nil { + return fmt.Errorf("error destroying set %s:, error: %v", set, err) + } + return nil +} + +// DestroyAllSets is used to destroy all sets. +func (runner *runner) DestroyAllSets() error { + if _, err := runner.exec.Command(IPSetCmd, "destroy").CombinedOutput(); err != nil { + return fmt.Errorf("error destroying all sets, error: %v", err) + } + return nil +} + +// ListSets list all set names from kernel +func (runner *runner) ListSets() ([]string, error) { + out, err := runner.exec.Command(IPSetCmd, "list", "-n").CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing all sets, error: %v", err) + } + return strings.Split(string(out), "\n"), nil +} + +// ListEntries lists all the entries from a named set. +func (runner *runner) ListEntries(set string) ([]string, error) { + if len(set) == 0 { + return nil, fmt.Errorf("set name can't be nil") + } + out, err := runner.exec.Command(IPSetCmd, "list", set).CombinedOutput() + if err != nil { + return nil, fmt.Errorf("error listing set: %s, error: %v", set, err) + } + memberMatcher := regexp.MustCompile(EntryMemberPattern) + list := memberMatcher.ReplaceAllString(string(out[:]), "") + strs := strings.Split(list, "\n") + results := make([]string, 0) + for i := range strs { + if len(strs[i]) > 0 { + results = append(results, strs[i]) + } + } + return results, nil +} + +// GetVersion returns the version string. +func (runner *runner) GetVersion() (string, error) { + return getIPSetVersionString(runner.exec) +} + +// getIPSetVersionString runs "ipset --version" to get the version string +// in the form of "X.Y", i.e "6.19" +func getIPSetVersionString(exec utilexec.Interface) (string, error) { + cmd := exec.Command(IPSetCmd, "--version") + cmd.SetStdin(bytes.NewReader([]byte{})) + bytes, err := cmd.CombinedOutput() + if err != nil { + return "", err + } + versionMatcher := regexp.MustCompile(VersionPattern) + match := versionMatcher.FindStringSubmatch(string(bytes)) + if match == nil { + return "", fmt.Errorf("no ipset version found in string: %s", bytes) + } + return match[0], nil +} + +func validatePortRange(portRange string) bool { + strs := strings.Split(portRange, "-") + if len(strs) != 2 { + return false + } + for i := range strs { + if _, err := strconv.Atoi(strs[i]); err != nil { + return false + } + } + return true +} + +var _ = Interface(&runner{}) diff --git a/pkg/util/ipset/ipset_test.go b/pkg/util/ipset/ipset_test.go new file mode 100644 index 0000000000..71d4ce26d5 --- /dev/null +++ b/pkg/util/ipset/ipset_test.go @@ -0,0 +1,483 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipset + +import ( + "reflect" + "testing" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/utils/exec" + fakeexec "k8s.io/utils/exec/testing" +) + +func TestCheckIPSetVersion(t *testing.T) { + testCases := []struct { + vstring string + Expect string + Err bool + }{ + {"ipset v4.0, protocol version: 4", "v4.0", false}, + {"ipset v5.1, protocol version: 5", "v5.1", false}, + {"ipset v6.0, protocol version: 6", "v6.0", false}, + {"ipset v6.1, protocol version: 6", "v6.1", false}, + {"ipset v6.19, protocol version: 6", "v6.19", false}, + {"total junk", "", true}, + } + + for i := range testCases { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // ipset version response + func() ([]byte, error) { return []byte(testCases[i].vstring), nil }, + }, + } + + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + + gotVersion, err := getIPSetVersionString(&fexec) + if (err != nil) != testCases[i].Err { + t.Errorf("Expected error: %v, Got error: %v", testCases[i].Err, err) + } + if err == nil { + if testCases[i].Expect != gotVersion { + t.Errorf("Expected result: %v, Got result: %v", testCases[i].Expect, gotVersion) + } + } + } +} + +func TestFlushSet(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Success + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success. + err := runner.FlushSet("FOOBAR") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "flush", "FOOBAR") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Flush again + err = runner.FlushSet("FOOBAR") + if err != nil { + t.Errorf("expected success, got %v", err) + } +} + +func TestDestroySet(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Failure + func() ([]byte, error) { + return []byte("ipset v6.19: The set with the given name does not exist"), &fakeexec.FakeExitError{Status: 1} + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success + err := runner.DestroySet("FOOBAR") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "destroy", "FOOBAR") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Failure + err = runner.DestroySet("FOOBAR") + if err == nil { + t.Errorf("expected failure, got nil") + } +} + +func TestDestroyAllSets(t *testing.T) { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Success + func() ([]byte, error) { return []byte{}, nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success + err := runner.DestroyAllSets() + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "destroy") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Success + err = runner.DestroyAllSets() + if err != nil { + t.Errorf("Unexpected failure: %v", err) + } +} + +func TestCreateSet(t *testing.T) { + testSet := IPSet{ + Name: "FOOBAR", + SetType: HashIPPort, + HashFamily: ProtocolFamilyIPV4, + } + + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Failure + func() ([]byte, error) { + return []byte("ipset v6.19: Set cannot be created: set with the same name already exists"), &fakeexec.FakeExitError{Status: 1} + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Create with ignoreExistErr = false, expect success + err := runner.CreateSet(&testSet, false) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "create", "FOOBAR", "hash:ip,port", "family", "inet", "hashsize", "1024", "maxelem", "65536") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Create with ignoreExistErr = true, expect success + err = runner.CreateSet(&testSet, true) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("ipset", "create", "FOOBAR", "hash:ip,port", "family", "inet", "hashsize", "1024", "maxelem", "65536", "-exist") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1]) + } + // Create with ignoreExistErr = false, expect failure + err = runner.CreateSet(&testSet, false) + if err == nil { + t.Errorf("expected failure, got nil") + } +} + +func TestAddEntry(t *testing.T) { + testEntry := &Entry{ + IP: "192.168.1.1", + Port: 53, + Protocol: ProtocolUDP, + SetType: HashIPPort, + } + + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Failure + func() ([]byte, error) { + return []byte("ipset v6.19: Set cannot be created: set with the same name already exists"), &fakeexec.FakeExitError{Status: 1} + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Create with ignoreExistErr = false, expect success + err := runner.AddEntry(testEntry.String(), "FOOBAR", false) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "add", "FOOBAR", "192.168.1.1,udp:53") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + // Create with ignoreExistErr = true, expect success + err = runner.AddEntry(testEntry.String(), "FOOBAR", true) + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("expected 3 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[1]...).HasAll("ipset", "add", "FOOBAR", "192.168.1.1,udp:53", "-exist") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[1]) + } + // Create with ignoreExistErr = false, expect failure + err = runner.AddEntry(testEntry.String(), "FOOBAR", false) + if err == nil { + t.Errorf("expected failure, got nil") + } +} + +func TestDelEntry(t *testing.T) { + // TODO: Test more set type + testEntry := &Entry{ + IP: "192.168.1.1", + Port: 53, + Protocol: ProtocolUDP, + SetType: HashIPPort, + } + + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte{}, nil }, + // Failure + func() ([]byte, error) { + return []byte("ipset v6.19: Element cannot be deleted from the set: it's not added"), &fakeexec.FakeExitError{Status: 1} + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + err := runner.DelEntry(testEntry.String(), "FOOBAR") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "del", "FOOBAR", "192.168.1.1,udp:53") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + err = runner.DelEntry(testEntry.String(), "FOOBAR") + if err == nil { + t.Errorf("expected failure, got nil") + } +} + +func TestTestEntry(t *testing.T) { + // TODO: IPv6? + testEntry := &Entry{ + IP: "10.120.7.100", + Port: 8080, + Protocol: ProtocolTCP, + SetType: HashIPPort, + } + + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte("10.120.7.100,tcp:8080 is in set FOOBAR."), nil }, + // Failure + func() ([]byte, error) { + return []byte("192.168.1.3,tcp:8080 is NOT in set FOOBAR."), &fakeexec.FakeExitError{Status: 1} + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success + ok, err := runner.TestEntry(testEntry.String(), "FOOBAR") + if err != nil { + t.Errorf("expected success, got %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 2 CombinedOutput() calls, got %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "test", "FOOBAR", "10.120.7.100,tcp:8080") { + t.Errorf("wrong CombinedOutput() log, got %s", fcmd.CombinedOutputLog[0]) + } + if !ok { + t.Errorf("expect entry exists in test set, got not") + } + // Failure + ok, err = runner.TestEntry(testEntry.String(), "FOOBAR") + if err == nil || ok { + t.Errorf("expect entry doesn't exist in test set") + } +} + +func TestListEntries(t *testing.T) { + + output := `Name: foobar +Type: hash:ip,port +Revision: 2 +Header: family inet hashsize 1024 maxelem 65536 +Size in memory: 16592 +References: 0 +Members: +192.168.1.2,tcp:8080 +192.168.1.1,udp:53` + + emptyOutput := `Name: KUBE-NODE-PORT +Type: bitmap:port +Revision: 1 +Header: range 0-65535 +Size in memory: 524432 +References: 1 +Members: + +` + + testCases := []struct { + output string + expected []string + }{ + { + output: output, + expected: []string{"192.168.1.2,tcp:8080", "192.168.1.1,udp:53"}, + }, + { + output: emptyOutput, + expected: []string{}, + }, + } + + for i := range testCases { + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { + return []byte(testCases[i].output), nil + }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return fakeexec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + runner := New(&fexec) + // Success + entries, err := runner.ListEntries("foobar") + if err != nil { + t.Errorf("expected success, got: %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got: %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "list", "foobar") { + t.Errorf("wrong CombinedOutput() log, got: %s", fcmd.CombinedOutputLog[0]) + } + if len(entries) != len(testCases[i].expected) { + t.Errorf("expected %d ipset entries, got: %d", len(testCases[i].expected), len(entries)) + } + if !reflect.DeepEqual(entries, testCases[i].expected) { + t.Errorf("expected entries: %v, got: %v", testCases[i].expected, entries) + } + } +} + +func TestListSets(t *testing.T) { + output := `foo +bar +baz` + + expected := []string{"foo", "bar", "baz"} + + fcmd := fakeexec.FakeCmd{ + CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ + // Success + func() ([]byte, error) { return []byte(output), nil }, + }, + } + fexec := fakeexec.FakeExec{ + CommandScript: []fakeexec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + runner := New(&fexec) + // Success + list, err := runner.ListSets() + if err != nil { + t.Errorf("expected success, got: %v", err) + } + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("expected 1 CombinedOutput() calls, got: %d", fcmd.CombinedOutputCalls) + } + if !sets.NewString(fcmd.CombinedOutputLog[0]...).HasAll("ipset", "list", "-n") { + t.Errorf("wrong CombinedOutput() log, got: %s", fcmd.CombinedOutputLog[0]) + } + if len(list) != len(expected) { + t.Errorf("expected %d sets, got: %d", len(expected), len(list)) + } + if !reflect.DeepEqual(list, expected) { + t.Errorf("expected sets: %v, got: %v", expected, list) + } +} diff --git a/pkg/util/ipset/testing/BUILD b/pkg/util/ipset/testing/BUILD new file mode 100644 index 0000000000..593b04157c --- /dev/null +++ b/pkg/util/ipset/testing/BUILD @@ -0,0 +1,23 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["fake.go"], + importpath = "k8s.io/kubernetes/pkg/util/ipset/testing", + visibility = ["//visibility:public"], + deps = ["//pkg/util/ipset:go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/util/ipset/testing/fake.go b/pkg/util/ipset/testing/fake.go new file mode 100644 index 0000000000..aedf3d21b3 --- /dev/null +++ b/pkg/util/ipset/testing/fake.go @@ -0,0 +1,83 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/kubernetes/pkg/util/ipset" +) + +// FakeIPSet is a no-op implementation of ipset Interface +type FakeIPSet struct { + Lines []byte +} + +// NewFake create a new fake ipset interface. +func NewFake() *FakeIPSet { + return &FakeIPSet{} +} + +// GetVersion is part of interface. +func (*FakeIPSet) GetVersion() (string, error) { + return "0.0", nil +} + +// FlushSet is part of interface. +func (*FakeIPSet) FlushSet(set string) error { + return nil +} + +// DestroySet is part of interface. +func (*FakeIPSet) DestroySet(set string) error { + return nil +} + +// DestroyAllSets is part of interface. +func (*FakeIPSet) DestroyAllSets() error { + return nil +} + +// CreateSet is part of interface. +func (*FakeIPSet) CreateSet(set *ipset.IPSet, ignoreExistErr bool) error { + return nil +} + +// AddEntry is part of interface. +func (*FakeIPSet) AddEntry(entry string, set string, ignoreExistErr bool) error { + return nil +} + +// DelEntry is part of interface. +func (*FakeIPSet) DelEntry(entry string, set string) error { + return nil +} + +// TestEntry is part of interface. +func (*FakeIPSet) TestEntry(entry string, set string) (bool, error) { + return true, nil +} + +// ListEntries is part of interface. +func (*FakeIPSet) ListEntries(set string) ([]string, error) { + return nil, nil +} + +// ListSets is part of interface. +func (*FakeIPSet) ListSets() ([]string, error) { + return nil, nil +} + +var _ = ipset.Interface(&FakeIPSet{}) diff --git a/pkg/util/ipset/types.go b/pkg/util/ipset/types.go new file mode 100644 index 0000000000..d2406c0087 --- /dev/null +++ b/pkg/util/ipset/types.go @@ -0,0 +1,70 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipset + +// Type represents the ipset type +type Type string + +const ( + // HashIPPort represents the `hash:ip,port` type ipset. The hash:ip,port is similar to hash:ip but + // you can store IP address and protocol-port pairs in it. TCP, SCTP, UDP, UDPLITE, ICMP and ICMPv6 are supported + // with port numbers/ICMP(v6) types and other protocol numbers without port information. + HashIPPort Type = "hash:ip,port" + // HashIPPortIP represents the `hash:ip,port,ip` type ipset. The hash:ip,port,ip set type uses a hash to store + // IP address, port number and a second IP address triples. The port number is interpreted together with a + // protocol (default TCP) and zero protocol number cannot be used. + HashIPPortIP Type = "hash:ip,port,ip" + // HashIPPortNet represents the `hash:ip,port,net` type ipset. The hash:ip,port,net set type uses a hash to store IP address, port number and IP network address triples. The port + // number is interpreted together with a protocol (default TCP) and zero protocol number cannot be used. Network address + // with zero prefix size cannot be stored either. + HashIPPortNet Type = "hash:ip,port,net" + // BitmapPort represents the `bitmap:port` type ipset. The bitmap:port set type uses a memory range, where each bit + // represents one TCP/UDP port. A bitmap:port type of set can store up to 65535 ports. + BitmapPort Type = "bitmap:port" +) + +// DefaultPortRange defines the default bitmap:port valid port range. +const DefaultPortRange string = "0-65535" + +const ( + // ProtocolFamilyIPV4 represents IPv4 protocol. + ProtocolFamilyIPV4 = "inet" + // ProtocolFamilyIPV6 represents IPv6 protocol. + ProtocolFamilyIPV6 = "inet6" + // ProtocolTCP represents TCP protocol. + ProtocolTCP = "tcp" + // ProtocolUDP represents UDP protocol. + ProtocolUDP = "udp" +) + +// ValidIPSetTypes defines the supported ip set type. +var ValidIPSetTypes = []Type{ + HashIPPort, + HashIPPortIP, + BitmapPort, + HashIPPortNet, +} + +// IsValidIPSetType checks if the given ipset type is valid. +func IsValidIPSetType(set Type) bool { + for _, valid := range ValidIPSetTypes { + if set == valid { + return true + } + } + return false +} From c124fcf7d7caf273dc245cd81d0a1eff50209ea6 Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Sun, 5 Nov 2017 19:24:17 +0800 Subject: [PATCH 2/4] wrap ipset in proxy ipvs --- pkg/proxy/ipvs/ipset.go | 157 +++++++++++++++++++++++++++++++++++ pkg/proxy/ipvs/ipset_test.go | 49 +++++++++++ 2 files changed, 206 insertions(+) create mode 100644 pkg/proxy/ipvs/ipset.go create mode 100644 pkg/proxy/ipvs/ipset_test.go diff --git a/pkg/proxy/ipvs/ipset.go b/pkg/proxy/ipvs/ipset.go new file mode 100644 index 0000000000..0a01f0bacc --- /dev/null +++ b/pkg/proxy/ipvs/ipset.go @@ -0,0 +1,157 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "k8s.io/apimachinery/pkg/util/sets" + utilipset "k8s.io/kubernetes/pkg/util/ipset" + utilversion "k8s.io/kubernetes/pkg/util/version" + + "github.com/golang/glog" +) + +const ( + // MinIPSetCheckVersion is the min ipset version we need. IPv6 is supported in ipset 6.x + MinIPSetCheckVersion = "6.0" + + // KubeLoopBackIPSet is used to store endpoints dst ip:port, source ip for solving hairpin purpose. + KubeLoopBackIPSet = "KUBE-LOOP-BACK" + + // KubeClusterIPSet is used to store service cluster ip + port for masquerade purpose. + KubeClusterIPSet = "KUBE-CLUSTER-IP" + + // KubeExternalIPSet is used to store service external ip + port for masquerade and filter purpose. + KubeExternalIPSet = "KUBE-EXTERNAL-IP" + + // KubeLoadBalancerSet is used to store service load balancer ingress ip + port, it is the service lb portal. + KubeLoadBalancerSet = "KUBE-LOAD-BALANCER" + + // KubeLoadBalancerMasqSet is used to store service load balancer ingress ip + port for masquerade purpose. + KubeLoadBalancerMasqSet = "KUBE-LOAD-BALANCER-MASQ" + + // KubeLoadBalancerSourceIPSet is used to store service load balancer ingress ip + port + source IP for packet filter purpose. + KubeLoadBalancerSourceIPSet = "KUBE-LOAD-BALANCER-SOURCE-IP" + + // KubeLoadBalancerSourceCIDRSet is used to store service load balancer ingress ip + port + source cidr for packet filter purpose. + KubeLoadBalancerSourceCIDRSet = "KUBE-LOAD-BALANCER-SOURCE-CIDR" + + // KubeNodePortSetTCP is used to store nodeport TCP port for masquerade purpose. + KubeNodePortSetTCP = "KUBE-NODE-PORT-TCP" + + // KubeNodePortSetUDP is used to store nodeport UDP port for masquerade purpose. + KubeNodePortSetUDP = "KUBE-NODE-PORT-UDP" +) + +// IPSetVersioner can query the current ipset version. +type IPSetVersioner interface { + // returns "X.Y" + GetVersion() (string, error) +} + +// IPSet wraps util/ipset which is used by IPVS proxier. +type IPSet struct { + utilipset.IPSet + // activeEntries is the current active entries of the ipset. + activeEntries sets.String + // handle is the util ipset interface handle. + handle utilipset.Interface +} + +// NewIPSet initialize a new IPSet struct +func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, isIPv6 bool) *IPSet { + hashFamily := utilipset.ProtocolFamilyIPV4 + if isIPv6 { + hashFamily = utilipset.ProtocolFamilyIPV6 + } + set := &IPSet{ + IPSet: utilipset.IPSet{ + Name: name, + SetType: setType, + HashFamily: hashFamily, + }, + activeEntries: sets.NewString(), + handle: handle, + } + return set +} + +func (set *IPSet) isEmpty() bool { + return len(set.activeEntries.UnsortedList()) == 0 +} + +func (set *IPSet) resetEntries() { + set.activeEntries = sets.NewString() +} + +func (set *IPSet) syncIPSetEntries() { + appliedEntries, err := set.handle.ListEntries(set.Name) + if err != nil { + glog.Errorf("Failed to list ip set entries, error: %v", err) + return + } + + // currentIPSetEntries represents Endpoints watched from API Server. + currentIPSetEntries := sets.NewString() + for _, appliedEntry := range appliedEntries { + currentIPSetEntries.Insert(appliedEntry) + } + + if !set.activeEntries.Equal(currentIPSetEntries) { + // Clean legacy entries + for _, entry := range currentIPSetEntries.Difference(set.activeEntries).List() { + if err := set.handle.DelEntry(entry, set.Name); err != nil { + glog.Errorf("Failed to delete ip set entry: %s from ip set: %s, error: %v", entry, set.Name, err) + } else { + glog.V(3).Infof("Successfully delete legacy ip set entry: %s from ip set: %s", entry, set.Name) + } + } + // Create active entries + for _, entry := range set.activeEntries.Difference(currentIPSetEntries).List() { + if err := set.handle.AddEntry(entry, set.Name, true); err != nil { + glog.Errorf("Failed to add entry: %v to ip set: %s, error: %v", entry, set.Name, err) + } else { + glog.V(3).Infof("Successfully add entry: %v to ip set: %s", entry, set.Name) + } + } + } +} + +func ensureIPSets(ipSets ...*IPSet) error { + for _, set := range ipSets { + if err := set.handle.CreateSet(&set.IPSet, true); err != nil { + glog.Errorf("Failed to make sure ip set: %v exist, error: %v", set, err) + return err + } + } + return nil +} + +// checkMinVersion checks if ipset current version satisfies required min version +func checkMinVersion(vstring string) bool { + version, err := utilversion.ParseGeneric(vstring) + if err != nil { + glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err) + return false + } + + minVersion, err := utilversion.ParseGeneric(MinIPSetCheckVersion) + if err != nil { + glog.Errorf("MinCheckVersion (%s) is not a valid version string: %v", MinIPSetCheckVersion, err) + return false + } + return !version.LessThan(minVersion) +} diff --git a/pkg/proxy/ipvs/ipset_test.go b/pkg/proxy/ipvs/ipset_test.go new file mode 100644 index 0000000000..f1e1975dd2 --- /dev/null +++ b/pkg/proxy/ipvs/ipset_test.go @@ -0,0 +1,49 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipvs + +import ( + "testing" +) + +func TestCheckIPSetVersion(t *testing.T) { + testCases := []struct { + vstring string + valid bool + }{ + // version less than "6.0" is not valid. + {"4.0", false}, + {"5.1", false}, + {"5.1.2", false}, + // "7" is not a valid version string. + {"7", false}, + {"6.0", true}, + {"6.1", true}, + {"6.19", true}, + {"7.0", true}, + {"8.1.2", true}, + {"9.3.4.0", true}, + {"total junk", false}, + } + + for i := range testCases { + valid := checkMinVersion(testCases[i].vstring) + if testCases[i].valid != valid { + t.Errorf("Expected result: %v, Got result: %v", testCases[i].valid, valid) + } + } +} From fbf8a13376eccece758bdeee211f8e26305af201 Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Wed, 15 Nov 2017 17:20:49 +0800 Subject: [PATCH 3/4] use ipset doing snat and packet filter in ipvs proxy --- cmd/kube-proxy/app/BUILD | 1 + cmd/kube-proxy/app/server.go | 4 +- cmd/kube-proxy/app/server_others.go | 24 +- pkg/proxy/ipvs/BUILD | 6 + pkg/proxy/ipvs/proxier.go | 458 +++++++++++++++++++++++----- pkg/proxy/ipvs/proxier_test.go | 95 +++--- pkg/proxy/util/endpoints.go | 16 + 7 files changed, 484 insertions(+), 120 deletions(-) diff --git a/cmd/kube-proxy/app/BUILD b/cmd/kube-proxy/app/BUILD index cb74ef887a..4a4cb06111 100644 --- a/cmd/kube-proxy/app/BUILD +++ b/cmd/kube-proxy/app/BUILD @@ -41,6 +41,7 @@ go_library( "//pkg/proxy/userspace:go_default_library", "//pkg/util/configz:go_default_library", "//pkg/util/dbus:go_default_library", + "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/ipvs:go_default_library", "//pkg/util/mount:go_default_library", diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index 8db7af9d57..ec05012370 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -63,6 +63,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/ipvs" "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -356,6 +357,7 @@ type ProxyServer struct { EventClient v1core.EventsGetter IptInterface utiliptables.Interface IpvsInterface utilipvs.Interface + IpsetInterface utilipset.Interface execer exec.Interface Proxier proxy.ProxyProvider Broadcaster record.EventBroadcaster @@ -422,7 +424,7 @@ func (s *ProxyServer) Run() error { if s.CleanupAndExit { encounteredError := userspace.CleanupLeftovers(s.IptInterface) encounteredError = iptables.CleanupLeftovers(s.IptInterface) || encounteredError - encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface) || encounteredError + encounteredError = ipvs.CleanupLeftovers(s.IpvsInterface, s.IptInterface, s.IpsetInterface) || encounteredError if encounteredError { return errors.New("encountered an error while tearing down rules.") } diff --git a/cmd/kube-proxy/app/server_others.go b/cmd/kube-proxy/app/server_others.go index 035d0593fa..ee68a48540 100644 --- a/cmd/kube-proxy/app/server_others.go +++ b/cmd/kube-proxy/app/server_others.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/userspace" "k8s.io/kubernetes/pkg/util/configz" utildbus "k8s.io/kubernetes/pkg/util/dbus" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilnode "k8s.io/kubernetes/pkg/util/node" @@ -72,6 +73,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi var iptInterface utiliptables.Interface var ipvsInterface utilipvs.Interface + var ipsetInterface utilipset.Interface var dbus utildbus.Interface // Create a iptables utils. @@ -80,6 +82,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) ipvsInterface = utilipvs.New(execer) + ipsetInterface = utilipset.New(execer) // We omit creation of pretty much everything if we run in cleanup mode if cleanupAndExit { @@ -87,6 +90,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi execer: execer, IptInterface: iptInterface, IpvsInterface: ipvsInterface, + IpsetInterface: ipsetInterface, CleanupAndExit: cleanupAndExit, }, nil } @@ -119,7 +123,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi var serviceEventHandler proxyconfig.ServiceHandler var endpointsEventHandler proxyconfig.EndpointsHandler - proxyMode := getProxyMode(string(config.Mode), iptInterface, iptables.LinuxKernelCompatTester{}) + proxyMode := getProxyMode(string(config.Mode), iptInterface, ipsetInterface, iptables.LinuxKernelCompatTester{}) if proxyMode == proxyModeIPTables { glog.V(0).Info("Using iptables Proxier.") nodeIP := net.ParseIP(config.BindAddress) @@ -159,12 +163,13 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi userspace.CleanupLeftovers(iptInterface) // IPVS Proxier will generate some iptables rules, // need to clean them before switching to other proxy mode. - ipvs.CleanupLeftovers(ipvsInterface, iptInterface) + ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface) } else if proxyMode == proxyModeIPVS { glog.V(0).Info("Using ipvs Proxier.") proxierIPVS, err := ipvs.NewProxier( iptInterface, ipvsInterface, + ipsetInterface, utilsysctl.New(), execer, config.IPVS.SyncPeriod.Duration, @@ -220,7 +225,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi iptables.CleanupLeftovers(iptInterface) // IPVS Proxier will generate some iptables rules, // need to clean them before switching to other proxy mode. - ipvs.CleanupLeftovers(ipvsInterface, iptInterface) + ipvs.CleanupLeftovers(ipvsInterface, iptInterface, ipsetInterface) } iptInterface.AddReloadFunc(proxier.Sync) @@ -230,6 +235,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi EventClient: eventClient, IptInterface: iptInterface, IpvsInterface: ipvsInterface, + IpsetInterface: ipsetInterface, execer: execer, Proxier: proxier, Broadcaster: eventBroadcaster, @@ -249,7 +255,7 @@ func NewProxyServer(config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExi }, nil } -func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { +func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string { if proxyMode == proxyModeUserspace { return proxyModeUserspace } @@ -260,7 +266,7 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i if utilfeature.DefaultFeatureGate.Enabled(features.SupportIPVSProxyMode) { if proxyMode == proxyModeIPVS { - return tryIPVSProxy(iptver, kcompat) + return tryIPVSProxy(iptver, ipsetver, kcompat) } else { glog.Warningf("Can't use ipvs proxier, trying iptables proxier") return tryIPTablesProxy(iptver, kcompat) @@ -270,10 +276,10 @@ func getProxyMode(proxyMode string, iptver iptables.IPTablesVersioner, kcompat i return tryIPTablesProxy(iptver, kcompat) } -func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelCompatTester) string { +func tryIPVSProxy(iptver iptables.IPTablesVersioner, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string { // guaranteed false on error, error only necessary for debugging - // IPVS Proxier relies on iptables - useIPVSProxy, err := ipvs.CanUseIPVSProxier() + // IPVS Proxier relies on ipset + useIPVSProxy, err := ipvs.CanUseIPVSProxier(ipsetver) if err != nil { // Try to fallback to iptables before falling back to userspace utilruntime.HandleError(fmt.Errorf("can't determine whether to use ipvs proxy, error: %v", err)) @@ -282,8 +288,6 @@ func tryIPVSProxy(iptver iptables.IPTablesVersioner, kcompat iptables.KernelComp return proxyModeIPVS } - // TODO: Check ipvs version - // Try to fallback to iptables before falling back to userspace glog.V(1).Infof("Can't use ipvs proxier, trying iptables proxier") return tryIPTablesProxy(iptver, kcompat) diff --git a/pkg/proxy/ipvs/BUILD b/pkg/proxy/ipvs/BUILD index 232c71ee3e..30945e4433 100644 --- a/pkg/proxy/ipvs/BUILD +++ b/pkg/proxy/ipvs/BUILD @@ -9,6 +9,7 @@ load( go_test( name = "go_default_test", srcs = [ + "ipset_test.go", "proxier_test.go", ], importpath = "k8s.io/kubernetes/pkg/proxy/ipvs", @@ -18,6 +19,8 @@ go_test( "//pkg/proxy:go_default_library", "//pkg/proxy/ipvs/testing:go_default_library", "//pkg/proxy/util:go_default_library", + "//pkg/util/ipset:go_default_library", + "//pkg/util/ipset/testing:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/iptables/testing:go_default_library", "//pkg/util/ipvs:go_default_library", @@ -35,6 +38,7 @@ go_test( go_library( name = "go_default_library", srcs = [ + "ipset.go", "netlink.go", "netlink_unsupported.go", "proxier.go", @@ -55,9 +59,11 @@ go_library( "//pkg/proxy/metrics:go_default_library", "//pkg/proxy/util:go_default_library", "//pkg/util/async:go_default_library", + "//pkg/util/ipset:go_default_library", "//pkg/util/iptables:go_default_library", "//pkg/util/ipvs:go_default_library", "//pkg/util/sysctl:go_default_library", + "//pkg/util/version:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index 8066f61003..64fa6945f0 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -49,6 +49,7 @@ import ( "k8s.io/kubernetes/pkg/proxy/metrics" utilproxy "k8s.io/kubernetes/pkg/proxy/util" "k8s.io/kubernetes/pkg/util/async" + utilipset "k8s.io/kubernetes/pkg/util/ipset" utiliptables "k8s.io/kubernetes/pkg/util/iptables" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" @@ -59,6 +60,12 @@ const ( // kubeServicesChain is the services portal chain kubeServicesChain utiliptables.Chain = "KUBE-SERVICES" + // KubeServiceIPSetsChain is the services access IP chain + KubeServiceIPSetsChain utiliptables.Chain = "KUBE-SVC-IPSETS" + + // KubeFireWallChain is the kubernetes firewall chain. + KubeFireWallChain utiliptables.Chain = "KUBE-FIRE-WALL" + // kubePostroutingChain is the kubernetes postrouting chain kubePostroutingChain utiliptables.Chain = "KUBE-POSTROUTING" @@ -67,11 +74,10 @@ const ( // KubeMarkDropChain is the mark-for-drop chain KubeMarkDropChain utiliptables.Chain = "KUBE-MARK-DROP" -) -const ( // DefaultScheduler is the default ipvs scheduler algorithm - round robin. DefaultScheduler = "rr" + // DefaultDummyDevice is the default dummy interface where ipvs service address will bind to it. DefaultDummyDevice = "kube-ipvs0" ) @@ -117,6 +123,7 @@ type Proxier struct { minSyncPeriod time.Duration iptables utiliptables.Interface ipvs utilipvs.Interface + ipset utilipset.Interface exec utilexec.Interface masqueradeAll bool masqueradeMark string @@ -137,6 +144,26 @@ type Proxier struct { natRules *bytes.Buffer // Added as a member to the struct to allow injection for testing. netlinkHandle NetLinkHandle + // loopbackSet is the ipset where stores all endpoints IP:Port,IP for solving hairpin mode purpose. + loopbackSet *IPSet + // clusterIPSet is the ipset where stores all service ClusterIP:Port + clusterIPSet *IPSet + // nodePortSetTCP is the bitmap:port type ipset where stores all TCP node port + nodePortSetTCP *IPSet + // nodePortSetTCP is the bitmap:port type ipset where stores all UDP node port + nodePortSetUDP *IPSet + // externalIPSet is the hash:ip,port type ipset where stores all service ExternalIP:Port + externalIPSet *IPSet + // lbIngressSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port. + lbIngressSet *IPSet + // lbMasqSet is the hash:ip,port type ipset where stores all service load balancer ingress IP:Port which needs masquerade. + lbMasqSet *IPSet + // lbWhiteListIPSet is the hash:ip,port,ip type ipset where stores all service load balancer ingress IP:Port,sourceIP pair, any packets + // with the source IP visit ingress IP:Port can pass through. + lbWhiteListIPSet *IPSet + // lbWhiteListIPSet is the hash:ip,port,net type ipset where stores all service load balancer ingress IP:Port,sourceCIDR pair, any packets + // from the source CIDR visit ingress IP:Port can pass through. + lbWhiteListCIDRSet *IPSet } // IPGetter helps get node network interface IP @@ -184,7 +211,9 @@ var _ proxy.ProxyProvider = &Proxier{} // An error will be returned if it fails to update or acquire the initial lock. // Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and // will not terminate if a particular iptables or ipvs call fails. -func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, +func NewProxier(ipt utiliptables.Interface, + ipvs utilipvs.Interface, + ipset utilipset.Interface, sysctl utilsysctl.Interface, exec utilexec.Interface, syncPeriod time.Duration, @@ -248,32 +277,46 @@ func NewProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps + isIPv6 := utilproxy.IsIPv6(nodeIP) + + glog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6) + proxier := &Proxier{ - portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(hostname), - syncPeriod: syncPeriod, - minSyncPeriod: minSyncPeriod, - iptables: ipt, - masqueradeAll: masqueradeAll, - masqueradeMark: masqueradeMark, - exec: exec, - clusterCIDR: clusterCIDR, - hostname: hostname, - nodeIP: nodeIP, - portMapper: &listenPortOpener{}, - recorder: recorder, - healthChecker: healthChecker, - healthzServer: healthzServer, - ipvs: ipvs, - ipvsScheduler: scheduler, - ipGetter: &realIPGetter{}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: NewNetLinkHandle(), + portsMap: make(map[utilproxy.LocalPort]utilproxy.Closeable), + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(hostname), + syncPeriod: syncPeriod, + minSyncPeriod: minSyncPeriod, + iptables: ipt, + masqueradeAll: masqueradeAll, + masqueradeMark: masqueradeMark, + exec: exec, + clusterCIDR: clusterCIDR, + hostname: hostname, + nodeIP: nodeIP, + portMapper: &listenPortOpener{}, + recorder: recorder, + healthChecker: healthChecker, + healthzServer: healthzServer, + ipvs: ipvs, + ipvsScheduler: scheduler, + ipGetter: &realIPGetter{}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: NewNetLinkHandle(), + ipset: ipset, + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, isIPv6), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, isIPv6), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, isIPv6), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, isIPv6), + lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, isIPv6), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, isIPv6), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, isIPv6), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), } burstSyncs := 2 glog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs) @@ -485,6 +528,11 @@ func (e *endpointsInfo) IPPart() string { return utilproxy.IPPart(e.endpoint) } +// PortPart returns just the Port part of the endpoint. +func (e *endpointsInfo) PortPart() (int, error) { + return utilproxy.PortPart(e.endpoint) +} + type endpointServicePair struct { endpoint string servicePortName proxy.ServicePortName @@ -652,7 +700,7 @@ func (em proxyEndpointsMap) unmerge(other proxyEndpointsMap) { // This is determined by checking if all the required kernel modules can be loaded. It may // return an error if it fails to get the kernel modules information without error, in which // case it will also return false. -func CanUseIPVSProxier() (bool, error) { +func CanUseIPVSProxier(ipsetver IPSetVersioner) (bool, error) { // Try to load IPVS required kernel modules using modprobe for _, kmod := range ipvsModules { err := utilexec.New().Command("modprobe", "--", kmod).Run() @@ -677,6 +725,15 @@ func CanUseIPVSProxier() (bool, error) { if len(modules) != 0 { return false, fmt.Errorf("IPVS proxier will not be used because the following required kernel modules are not loaded: %v", modules) } + + // Check ipset version + versionString, err := ipsetver.GetVersion() + if err != nil { + return false, fmt.Errorf("error getting ipset version, error: %v", err) + } + if !checkMinVersion(versionString) { + return false, fmt.Errorf("ipset version: %s is less than min required version: %s", versionString, MinIPSetCheckVersion) + } return true, nil } @@ -728,7 +785,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool natRules := bytes.NewBuffer(nil) writeLine(natChains, "*nat") // Start with chains we know we need to remove. - for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain} { + for _, chain := range []utiliptables.Chain{kubeServicesChain, kubePostroutingChain, KubeMarkMasqChain, KubeServiceIPSetsChain} { if _, found := existingNATChains[chain]; found { chainString := string(chain) writeLine(natChains, existingNATChains[chain]) // flush @@ -748,7 +805,7 @@ func cleanupIptablesLeftovers(ipt utiliptables.Interface) (encounteredError bool } // CleanupLeftovers clean up all ipvs and iptables rules created by ipvs Proxier. -func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (encounteredError bool) { +func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset utilipset.Interface) (encounteredError bool) { // Return immediately when ipvs interface is nil - Probably initialization failed in somewhere. if ipvs == nil { return true @@ -768,6 +825,16 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface) (enco } // Clear iptables created by ipvs Proxier. encounteredError = cleanupIptablesLeftovers(ipt) || encounteredError + // Destroy ip sets created by ipvs Proxier. We should call it after cleaning up + // iptables since we can NOT delete ip set which is still referenced by iptables. + ipSetsToDestroy := []string{KubeLoopBackIPSet, KubeClusterIPSet, KubeLoadBalancerSet, KubeNodePortSetTCP, KubeNodePortSetUDP, + KubeExternalIPSet, KubeLoadBalancerSourceIPSet, KubeLoadBalancerSourceCIDRSet, KubeLoadBalancerMasqSet} + for _, set := range ipSetsToDestroy { + err = ipset.DestroySet(set) + if err != nil { + encounteredError = true + } + } return encounteredError } @@ -957,6 +1024,16 @@ func (proxier *Proxier) syncProxyRules() { return } + // make sure ip sets exists in the system. + ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP, + proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet} + if err := ensureIPSets(ipSets...); err != nil { + return + } + for i := range ipSets { + ipSets[i].resetEntries() + } + // Accumulate the set of local ports that we will be holding open once this update is complete replacementPortsMap := map[utilproxy.LocalPort]utilproxy.Closeable{} // activeIPVSServices represents IPVS service successfully created in this round of sync @@ -976,6 +1053,17 @@ func (proxier *Proxier) syncProxyRules() { // is just for efficiency, not correctness. args := make([]string, 64) + // Kube service portal + if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil { + glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) + return + } + // Kube service ipset + if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil { + glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err) + return + } + // Build IPVS rules for each service. for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) @@ -983,7 +1071,41 @@ func (proxier *Proxier) syncProxyRules() { // to ServicePortName.String() show up in CPU profiles. svcNameString := svcName.String() + // Handle traffic that loops back to the originator with SNAT. + for _, ep := range proxier.endpointsMap[svcName] { + epIP := ep.IPPart() + epPort, err := ep.PortPart() + // Error parsing this endpoint has been logged. Skip to next endpoint. + if epIP == "" || err != nil { + continue + } + entry := &utilipset.Entry{ + IP: epIP, + Port: epPort, + Protocol: protocol, + IP2: epIP, + SetType: utilipset.HashIPPortIP, + } + proxier.loopbackSet.activeEntries.Insert(entry.String()) + } + // Capture the clusterIP. + // ipset call + entry := &utilipset.Entry{ + IP: svcInfo.clusterIP.String(), + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. + // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) + // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. + if proxier.masqueradeAll { + proxier.clusterIPSet.activeEntries.Insert(entry.String()) + } else if len(proxier.clusterCIDR) > 0 { + proxier.clusterIPSet.activeEntries.Insert(entry.String()) + } + // ipvs call serv := &utilipvs.VirtualServer{ Address: svcInfo.clusterIP, Port: uint16(svcInfo.port), @@ -1004,32 +1126,6 @@ func (proxier *Proxier) syncProxyRules() { } else { glog.Errorf("Failed to sync service: %v, err: %v", serv, err) } - // Install masquerade rules if 'masqueradeAll' or 'clusterCIDR' is specified. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString), - "-m", protocol, "-p", protocol, - "-d", utilproxy.ToCIDR(svcInfo.clusterIP), - "--dport", strconv.Itoa(svcInfo.port), - ) - if proxier.masqueradeAll { - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) - } else if len(proxier.clusterCIDR) > 0 { - // This masquerades off-cluster traffic to a service VIP. The idea - // is that you can establish a static route for your Service range, - // routing to any node, and that node will bridge into the Service - // for you. Since that might bounce off-node, we masquerade here. - // If/when we support "Local" policy for VIPs, we should update this. - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } - writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) - } // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { @@ -1064,6 +1160,17 @@ func (proxier *Proxier) syncProxyRules() { } } // We're holding the port, so it's OK to install IPVS rules. + // ipset call + entry := &utilipset.Entry{ + IP: externalIP, + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // We have to SNAT packets to external IPs. + proxier.externalIPSet.activeEntries.Insert(entry.String()) + + // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(externalIP), Port: uint16(svcInfo.port), @@ -1088,25 +1195,39 @@ func (proxier *Proxier) syncProxyRules() { // Capture load-balancer ingress. for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { + // ipset call + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + SetType: utilipset.HashIPPort, + } + // add service load balancer ingressIP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. + // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) + // If we are proxying globally, we need to masquerade in case we cross nodes. + // If we are proxying only locally, we can retain the source IP. + if !svcInfo.onlyNodeLocalEndpoints { + proxier.lbMasqSet.activeEntries.Insert(entry.String()) + } if len(svcInfo.loadBalancerSourceRanges) != 0 { - err = proxier.linkKubeServiceChain(existingNATChains, proxier.natChains) - if err != nil { - glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err) - } // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. - args = append(args[:0], - "-A", string(kubeServicesChain), - "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcNameString), - "-m", string(svcInfo.protocol), "-p", string(svcInfo.protocol), - "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), - "--dport", fmt.Sprintf("%d", svcInfo.port), - ) + proxier.lbIngressSet.activeEntries.Insert(entry.String()) allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { - writeLine(proxier.natRules, append(args, "-s", src, "-j", "ACCEPT")...) + // ipset call + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + Net: src, + SetType: utilipset.HashIPPortNet, + } + // enumerate all white list source cidr + proxier.lbWhiteListCIDRSet.activeEntries.Insert(entry.String()) + // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { @@ -1117,14 +1238,19 @@ func (proxier *Proxier) syncProxyRules() { // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { - writeLine(proxier.natRules, append(args, "-s", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), "-j", "ACCEPT")...) + entry = &utilipset.Entry{ + IP: ingress.IP, + Port: svcInfo.port, + Protocol: protocol, + IP2: ingress.IP, + SetType: utilipset.HashIPPortIP, + } + // enumerate all white list source ip + proxier.lbWhiteListIPSet.activeEntries.Insert(entry.String()) } - - // If the packet was able to reach the end of firewall chain, then it did not get DNATed. - // It means the packet cannot go through the firewall, then DROP it. - writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) } + // ipvs call serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress.IP), Port: uint16(svcInfo.port), @@ -1170,12 +1296,33 @@ func (proxier *Proxier) syncProxyRules() { replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install ipvs rules. + // Nodeports need SNAT, unless they're local. + // ipset call + if !svcInfo.onlyNodeLocalEndpoints { + entry = &utilipset.Entry{ + // No need to provide ip info + Port: svcInfo.nodePort, + Protocol: protocol, + SetType: utilipset.BitmapPort, + } + switch protocol { + case "tcp": + proxier.nodePortSetTCP.activeEntries.Insert(entry.String()) + case "udp": + proxier.nodePortSetUDP.activeEntries.Insert(entry.String()) + default: + // It should never hit + glog.Errorf("Unsupported protocol type: %s", protocol) + } + } + // Build ipvs kernel routes for each node ip address nodeIPs, err := proxier.ipGetter.NodeIPs() if err != nil { glog.Errorf("Failed to get node IP, err: %v", err) } else { for _, nodeIP := range nodeIPs { + // ipvs call serv := &utilipvs.VirtualServer{ Address: nodeIP, Port: uint16(svcInfo.nodePort), @@ -1200,6 +1347,119 @@ func (proxier *Proxier) syncProxyRules() { } } + // sync ipset entries + ipsetsToSync := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.lbIngressSet, proxier.lbMasqSet, proxier.nodePortSetTCP, + proxier.nodePortSetUDP, proxier.externalIPSet, proxier.lbWhiteListIPSet, proxier.lbWhiteListCIDRSet} + for i := range ipsetsToSync { + ipsetsToSync[i].syncIPSetEntries() + } + + // Tail call iptables rules for ipset, make sure only call iptables once + // in a single loop per ip set. + if !proxier.loopbackSet.isEmpty() { + args = append(args[:0], + "-A", string(kubePostroutingChain), + "-m", "set", "--match-set", proxier.loopbackSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "MASQUERADE")...) + } + if !proxier.clusterIPSet.isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.clusterIPSet.Name, + "dst,dst", + ) + if proxier.masqueradeAll { + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } else if len(proxier.clusterCIDR) > 0 { + // This masquerades off-cluster traffic to a service VIP. The idea + // is that you can establish a static route for your Service range, + // routing to any node, and that node will bridge into the Service + // for you. Since that might bounce off-node, we masquerade here. + // If/when we support "Local" policy for VIPs, we should update this. + writeLine(proxier.natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) + } + } + if !proxier.externalIPSet.isEmpty() { + // Build masquerade rules for packets to external IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.externalIPSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) + // nor from a local process to be forwarded to the service. + // This rule roughly translates to "all traffic from off-machine". + // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. + externalTrafficOnlyArgs := append(args, + "-m", "physdev", "!", "--physdev-is-in", + "-m", "addrtype", "!", "--src-type", "LOCAL") + writeLine(proxier.natRules, append(externalTrafficOnlyArgs, "-j", "ACCEPT")...) + dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") + // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. + // This covers cases like GCE load-balancers which get added to the local routing table. + writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", "ACCEPT")...) + } + if !proxier.lbMasqSet.isEmpty() { + // Build masquerade rules for packets which cross node visit load balancer ingress IPs. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "set", "--match-set", proxier.lbMasqSet.Name, + "dst,dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + if !proxier.lbWhiteListCIDRSet.isEmpty() || !proxier.lbWhiteListIPSet.isEmpty() { + // link kube-services chain -> kube-fire-wall chain + args := []string{"-m", "set", "--match-set", proxier.lbIngressSet.Name, "dst,dst", "-j", string(KubeFireWallChain)} + if _, err := proxier.iptables.EnsureRule(utiliptables.Append, utiliptables.TableNAT, kubeServicesChain, args...); err != nil { + glog.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.lbIngressSet.Name, kubeServicesChain, KubeFireWallChain, err) + } + if !proxier.lbWhiteListCIDRSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListCIDRSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + if !proxier.lbWhiteListIPSet.isEmpty() { + args = append(args[:0], + "-A", string(KubeFireWallChain), + "-m", "set", "--match-set", proxier.lbWhiteListIPSet.Name, + "dst,dst,src", + ) + writeLine(proxier.natRules, append(args, "-j", "ACCEPT")...) + } + args = append(args[:0], + "-A", string(KubeFireWallChain), + ) + // If the packet was able to reach the end of firewall chain, then it did not get DNATed. + // It means the packet cannot go thru the firewall, then mark it for DROP + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } + if !proxier.nodePortSetTCP.isEmpty() { + // Build masquerade rules for packets which cross node visit nodeport. + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "tcp", "-p", "tcp", + "-m", "set", "--match-set", proxier.nodePortSetTCP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + if !proxier.nodePortSetUDP.isEmpty() { + args = append(args[:0], + "-A", string(kubeServicesChain), + "-m", "udp", "-p", "udp", + "-m", "set", "--match-set", proxier.nodePortSetUDP.Name, + "dst", + ) + writeLine(proxier.natRules, append(args, "-j", string(KubeMarkMasqChain))...) + } + // Write the end-of-table markers. writeLine(proxier.natRules, "COMMIT") @@ -1411,7 +1671,6 @@ func (proxier *Proxier) cleanLegacyService(atciveServices map[string]bool, curre } // linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT -// If not specify masqueradeAll or clusterCIDR or LB source range, won't create them. // Chain PREROUTING (policy ACCEPT) // target prot opt source destination @@ -1451,6 +1710,55 @@ func (proxier *Proxier) linkKubeServiceChain(existingNATChains map[utiliptables. return nil } +//// linkKubeIPSetsChain will Create chain KUBE-SVC-IPSETS and link the chin in KUBE-SERVICES +// +//// Chain KUBE-SERVICES (policy ACCEPT) +//// target prot opt source destination +//// KUBE-SVC-IPSETS all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-SERVICE-ACCESS dst,dst +// +//// Chain KUBE-SVC-IPSETS (1 references) +//// target prot opt source destination +//// KUBE-MARK-MASQ all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst +//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst PHYSDEV match ! --physdev-is-in ADDRTYPE match src-type !LOCAL +//// ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 match-set KUBE-EXTERNAL-IP dst,dst ADDRTYPE match dst-type LOCAL +//// ... +//func (proxier *Proxier) linkKubeIPSetsChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { +// if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeServiceIPSetsChain); err != nil { +// return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeServiceIPSetsChain, err) +// } +// +// // TODO: iptables comment message for ipset? +// // The hash:ip,port type of sets require two src/dst parameters of the set match and SET target kernel modules. +// args := []string{"-m", "set", "--match-set", proxier.kubeServiceAccessSet.Name, "dst,dst", "-j", string(KubeServiceIPSetsChain)} +// if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, kubeServicesChain, args...); err != nil { +// return fmt.Errorf("Failed to ensure that ipset %s chain %s jumps to %s: %v", proxier.kubeServiceAccessSet.Name, kubeServicesChain, KubeServiceIPSetsChain, err) +// } +// +// // equal to `iptables -t nat -N KUBE-SVC-IPSETS` +// // write `:KUBE-SERVICES - [0:0]` in nat table +// if chain, ok := existingNATChains[KubeServiceIPSetsChain]; ok { +// writeLine(natChains, chain) +// } else { +// writeLine(natChains, utiliptables.MakeChainLine(KubeServiceIPSetsChain)) +// } +// return nil +//} + +func (proxier *Proxier) createKubeFireWallChain(existingNATChains map[utiliptables.Chain]string, natChains *bytes.Buffer) error { + // `iptables -t nat -N KUBE-FIRE-WALL` + if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, KubeFireWallChain); err != nil { + return fmt.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, KubeFireWallChain, err) + } + + // write `:KUBE-FIRE-WALL - [0:0]` in nat table + if chain, ok := existingNATChains[KubeFireWallChain]; ok { + writeLine(natChains, chain) + } else { + writeLine(natChains, utiliptables.MakeChainLine(KubeFireWallChain)) + } + return nil +} + // Join all words with spaces, terminate with newline and write to buff. func writeLine(buf *bytes.Buffer, words ...string) { // We avoid strings.Join for performance reasons. diff --git a/pkg/proxy/ipvs/proxier_test.go b/pkg/proxy/ipvs/proxier_test.go index b4147353d4..05701cff8f 100644 --- a/pkg/proxy/ipvs/proxier_test.go +++ b/pkg/proxy/ipvs/proxier_test.go @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" netlinktest "k8s.io/kubernetes/pkg/proxy/ipvs/testing" proxyutil "k8s.io/kubernetes/pkg/proxy/util" + utilipset "k8s.io/kubernetes/pkg/util/ipset" + ipsettest "k8s.io/kubernetes/pkg/util/ipset/testing" utiliptables "k8s.io/kubernetes/pkg/util/iptables" iptablestest "k8s.io/kubernetes/pkg/util/iptables/testing" utilipvs "k8s.io/kubernetes/pkg/util/ipvs" @@ -85,7 +87,7 @@ func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedNa return nil } -func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs []net.IP) *Proxier { +func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, ipset utilipset.Interface, nodeIPs []net.IP) *Proxier { fcmd := fakeexec.FakeCmd{ CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte("dummy device have been created"), nil }, @@ -98,24 +100,34 @@ func NewFakeProxier(ipt utiliptables.Interface, ipvs utilipvs.Interface, nodeIPs LookPathFunc: func(cmd string) (string, error) { return cmd, nil }, } return &Proxier{ - exec: fexec, - serviceMap: make(proxyServiceMap), - serviceChanges: newServiceChangeMap(), - endpointsMap: make(proxyEndpointsMap), - endpointsChanges: newEndpointsChangeMap(testHostname), - iptables: ipt, - ipvs: ipvs, - clusterCIDR: "10.0.0.0/24", - hostname: testHostname, - portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), - portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, - healthChecker: newFakeHealthChecker(), - ipvsScheduler: DefaultScheduler, - ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, - iptablesData: bytes.NewBuffer(nil), - natChains: bytes.NewBuffer(nil), - natRules: bytes.NewBuffer(nil), - netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + exec: fexec, + serviceMap: make(proxyServiceMap), + serviceChanges: newServiceChangeMap(), + endpointsMap: make(proxyEndpointsMap), + endpointsChanges: newEndpointsChangeMap(testHostname), + iptables: ipt, + ipvs: ipvs, + ipset: ipset, + clusterCIDR: "10.0.0.0/24", + hostname: testHostname, + portsMap: make(map[proxyutil.LocalPort]proxyutil.Closeable), + portMapper: &fakePortOpener{[]*proxyutil.LocalPort{}}, + healthChecker: newFakeHealthChecker(), + ipvsScheduler: DefaultScheduler, + ipGetter: &fakeIPGetter{nodeIPs: nodeIPs}, + iptablesData: bytes.NewBuffer(nil), + natChains: bytes.NewBuffer(nil), + natRules: bytes.NewBuffer(nil), + netlinkHandle: netlinktest.NewFakeNetlinkHandle(), + loopbackSet: NewIPSet(ipset, KubeLoopBackIPSet, utilipset.HashIPPortIP, false), + clusterIPSet: NewIPSet(ipset, KubeClusterIPSet, utilipset.HashIPPort, false), + externalIPSet: NewIPSet(ipset, KubeExternalIPSet, utilipset.HashIPPort, false), + lbIngressSet: NewIPSet(ipset, KubeLoadBalancerSet, utilipset.HashIPPort, false), + lbMasqSet: NewIPSet(ipset, KubeLoadBalancerMasqSet, utilipset.HashIPPort, false), + lbWhiteListIPSet: NewIPSet(ipset, KubeLoadBalancerSourceIPSet, utilipset.HashIPPortIP, false), + lbWhiteListCIDRSet: NewIPSet(ipset, KubeLoadBalancerSourceCIDRSet, utilipset.HashIPPortNet, false), + nodePortSetTCP: NewIPSet(ipset, KubeNodePortSetTCP, utilipset.BitmapPort, false), + nodePortSetUDP: NewIPSet(ipset, KubeNodePortSetUDP, utilipset.BitmapPort, false), } } @@ -171,10 +183,11 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap func TestNodePort(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIPv4 := net.ParseIP("100.101.102.103") nodeIPv6 := net.ParseIP("2001:db8::1:1") nodeIPs := sets.NewString(nodeIPv4.String(), nodeIPv6.String()) - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIPv4, nodeIPv6}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIPv4, nodeIPv6}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -248,8 +261,9 @@ func TestNodePort(t *testing.T) { func TestNodePortNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -301,7 +315,8 @@ func TestNodePortNoEndpoint(t *testing.T) { func TestClusterIPNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcPortName := proxy.ServicePortName{ @@ -344,7 +359,8 @@ func TestClusterIPNoEndpoint(t *testing.T) { func TestClusterIP(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIPv4 := "10.20.30.41" svcPortV4 := 80 @@ -450,7 +466,8 @@ func TestClusterIP(t *testing.T) { func TestExternalIPsNoEndpoint(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := "50.60.70.81" @@ -504,7 +521,8 @@ func TestExternalIPsNoEndpoint(t *testing.T) { func TestExternalIPs(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcExternalIPs := sets.NewString("50.60.70.81", "2012::51") @@ -573,7 +591,8 @@ func TestExternalIPs(t *testing.T) { func TestLoadBalancer(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -624,8 +643,9 @@ func strPtr(s string) *string { func TestOnlyLocalNodePorts(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -705,7 +725,8 @@ func TestOnlyLocalNodePorts(t *testing.T) { func TestOnlyLocalLoadBalancing(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -769,7 +790,8 @@ func addTestPort(array []api.ServicePort, name string, protocol api.Protocol, po func TestBuildServiceMapAddRemove(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) services := []*api.Service{ makeTestService("somewhere-else", "cluster-ip", func(svc *api.Service) { @@ -874,7 +896,8 @@ func TestBuildServiceMapAddRemove(t *testing.T) { func TestBuildServiceMapServiceHeadless(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) makeServiceMap(fp, makeTestService("somewhere-else", "headless", func(svc *api.Service) { @@ -907,7 +930,8 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) makeServiceMap(fp, makeTestService("somewhere-else", "external-name", func(svc *api.Service) { @@ -934,7 +958,8 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { func TestBuildServiceMapServiceUpdate(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) servicev1 := makeTestService("somewhere", "some-service", func(svc *api.Service) { svc.Spec.Type = api.ServiceTypeClusterIP @@ -1016,8 +1041,9 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { func TestSessionAffinity(t *testing.T) { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() + ipset := ipsettest.NewFake() nodeIP := net.ParseIP("100.101.102.103") - fp := NewFakeProxier(ipt, ipvs, []net.IP{nodeIP}) + fp := NewFakeProxier(ipt, ipvs, ipset, []net.IP{nodeIP}) svcIP := "10.20.30.41" svcPort := 80 svcNodePort := 3001 @@ -1879,7 +1905,8 @@ func Test_updateEndpointsMap(t *testing.T) { for tci, tc := range testCases { ipt := iptablestest.NewFake() ipvs := ipvstest.NewFake() - fp := NewFakeProxier(ipt, ipvs, nil) + ipset := ipsettest.NewFake() + fp := NewFakeProxier(ipt, ipvs, ipset, nil) fp.hostname = nodeName // First check that after adding all previous versions of endpoints, diff --git a/pkg/proxy/util/endpoints.go b/pkg/proxy/util/endpoints.go index 32e770d4f9..a0ae306634 100644 --- a/pkg/proxy/util/endpoints.go +++ b/pkg/proxy/util/endpoints.go @@ -19,6 +19,7 @@ package util import ( "fmt" "net" + "strconv" "github.com/golang/glog" ) @@ -40,6 +41,21 @@ func IPPart(s string) string { return ip } +func PortPart(s string) (int, error) { + // Must be IP:port + _, port, err := net.SplitHostPort(s) + if err != nil { + glog.Errorf("Error parsing '%s': %v", s, err) + return -1, err + } + portNumber, err := strconv.Atoi(port) + if err != nil { + glog.Errorf("Error parsing '%s': %v", port, err) + return -1, err + } + return portNumber, nil +} + // ToCIDR returns a host address of the form /32 for // IPv4 and /128 for IPv6 func ToCIDR(ip net.IP) string { From 268da8544505260a3a6f6c0f4a00f2110e587683 Mon Sep 17 00:00:00 2001 From: m1093782566 Date: Sun, 19 Nov 2017 12:39:43 +0800 Subject: [PATCH 4/4] fix fakeIPSet ut error --- cmd/kube-proxy/app/server_test.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/cmd/kube-proxy/app/server_test.go b/cmd/kube-proxy/app/server_test.go index b2fc19a036..f9c70a4dce 100644 --- a/cmd/kube-proxy/app/server_test.go +++ b/cmd/kube-proxy/app/server_test.go @@ -52,6 +52,15 @@ func (fake *fakeIPTablesVersioner) GetVersion() (string, error) { return fake.version, fake.err } +type fakeIPSetVersioner struct { + version string // what to return + err error // what to return +} + +func (fake *fakeIPSetVersioner) GetVersion() (string, error) { + return fake.version, fake.err +} + type fakeKernelCompatTester struct { ok bool } @@ -72,8 +81,10 @@ func Test_getProxyMode(t *testing.T) { annotationKey string annotationVal string iptablesVersion string + ipsetVersion string kernelCompat bool iptablesError error + ipsetError error expected string }{ { // flag says userspace @@ -128,7 +139,8 @@ func Test_getProxyMode(t *testing.T) { for i, c := range cases { versioner := &fakeIPTablesVersioner{c.iptablesVersion, c.iptablesError} kcompater := &fakeKernelCompatTester{c.kernelCompat} - r := getProxyMode(c.flag, versioner, kcompater) + ipsetver := &fakeIPSetVersioner{c.ipsetVersion, c.ipsetError} + r := getProxyMode(c.flag, versioner, ipsetver, kcompater) if r != c.expected { t.Errorf("Case[%d] Expected %q, got %q", i, c.expected, r) }