diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index a763bbfaf3..e4df638c3d 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -32,7 +32,7 @@ import ( ) type mockState struct { - assignments map[string]cpuset.CPUSet + assignments state.ContainerCPUAssignments defaultCPUSet cpuset.CPUSet } @@ -64,6 +64,19 @@ func (s *mockState) Delete(containerID string) { delete(s.assignments, containerID) } +func (s *mockState) ClearState() { + s.defaultCPUSet = cpuset.CPUSet{} + s.assignments = make(state.ContainerCPUAssignments) +} + +func (s *mockState) SetCPUAssignments(a state.ContainerCPUAssignments) { + s.assignments = a.Clone() +} + +func (s *mockState) GetCPUAssignments() state.ContainerCPUAssignments { + return s.assignments.Clone() +} + type mockPolicy struct { err error } @@ -190,7 +203,7 @@ func TestCPUManagerAdd(t *testing.T) { err: testCase.regErr, }, state: &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{ @@ -216,7 +229,7 @@ func TestCPUManagerRemove(t *testing.T) { err: nil, }, state: &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{}, @@ -251,7 +264,7 @@ func TestReconcileState(t *testing.T) { activePods []*v1.Pod pspPS v1.PodStatus pspFound bool - stAssignments map[string]cpuset.CPUSet + stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet updateErr error expectFailedContainerName string @@ -282,7 +295,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(1, 2), }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), @@ -308,7 +321,7 @@ func TestReconcileState(t *testing.T) { }, pspPS: v1.PodStatus{}, pspFound: false, - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectFailedContainerName: "fakeName", @@ -339,7 +352,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectFailedContainerName: "fakeName", @@ -370,7 +383,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(), }, stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), @@ -403,7 +416,7 @@ func TestReconcileState(t *testing.T) { }, }, pspFound: true, - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID": cpuset.NewCPUSet(1, 2), }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index e20f9bdbbc..99e61fdbbe 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -19,6 +19,7 @@ package cpumanager import ( "testing" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -36,7 +37,7 @@ func TestNonePolicyAdd(t *testing.T) { policy := &nonePolicy{} st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } @@ -53,7 +54,7 @@ func TestNonePolicyRemove(t *testing.T) { policy := &nonePolicy{} st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index bf6b53f326..e6626f4467 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -31,7 +32,7 @@ type staticPolicyTest struct { topo *topology.CPUTopology numReservedCPUs int containerID string - stAssignments map[string]cpuset.CPUSet + stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod expErr error @@ -53,7 +54,7 @@ func TestStaticPolicyStart(t *testing.T) { policy := NewStaticPolicy(topoSingleSocketHT, 1).(*staticPolicy) st := &mockState{ - assignments: map[string]cpuset.CPUSet{}, + assignments: state.ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), } @@ -88,7 +89,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("8000m", "8000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), @@ -100,7 +101,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("1000m", "1000m"), expErr: nil, @@ -112,7 +113,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2, 3, 6, 7), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), @@ -126,7 +127,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), @@ -140,7 +141,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11), @@ -154,7 +155,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketNoHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7), @@ -168,7 +169,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketNoHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(4, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7), @@ -182,7 +183,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID3", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(2), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), @@ -196,7 +197,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("1000m", "2000m"), expErr: nil, @@ -208,7 +209,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID4", - stAssignments: map[string]cpuset.CPUSet{}, + stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), pod: makePod("977m", "977m"), expErr: nil, @@ -220,7 +221,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 7), @@ -234,7 +235,7 @@ func TestStaticPolicyAdd(t *testing.T) { topo: topoDualSocketHT, numReservedCPUs: 1, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(1, 2, 3), }, stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11), @@ -250,7 +251,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocSock0", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7), }, stDefaultCPUSet: largeTopoCPUSet.Difference(cpuset.NewCPUSet(3, 11, 4, 5, 6, 7)), @@ -265,7 +266,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllFullCoresFromThreeSockets", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, 53, 173, 113, 233, 54, 61)), }, @@ -281,7 +282,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllSock1+FullCore", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, 173, 61, 181, 108, 228, 115, 235))), }, @@ -298,7 +299,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), @@ -314,7 +315,7 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, topoQuadSocketFourWayHT, NoAlloc", topo: topoQuadSocketFourWayHT, containerID: "fakeID5", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52), @@ -374,7 +375,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocOneContainer", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 2, 3), }, stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), @@ -384,7 +385,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 2, 3), "fakeID2": cpuset.NewCPUSet(4, 5, 6, 7), }, @@ -395,7 +396,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, DeAllocTwoContainer", topo: topoSingleSocketHT, containerID: "fakeID1", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 3, 5), "fakeID2": cpuset.NewCPUSet(2, 4), }, @@ -406,7 +407,7 @@ func TestStaticPolicyRemove(t *testing.T) { description: "SingleSocketHT, NoDeAlloc", topo: topoSingleSocketHT, containerID: "fakeID2", - stAssignments: map[string]cpuset.CPUSet{ + stAssignments: state.ContainerCPUAssignments{ "fakeID1": cpuset.NewCPUSet(1, 3, 5), }, stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7), diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index 6393bdec4d..8af631c9ad 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -1,9 +1,10 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "state.go", + "state_file.go", "state_mem.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", @@ -14,6 +15,14 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = ["state_file_test.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state", + library = ":go_default_library", + deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/kubelet/cm/cpumanager/state/state.go b/pkg/kubelet/cm/cpumanager/state/state.go index 98f7f7dc24..0550b644d5 100644 --- a/pkg/kubelet/cm/cpumanager/state/state.go +++ b/pkg/kubelet/cm/cpumanager/state/state.go @@ -20,17 +20,32 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) +// ContainerCPUAssignments type used in cpu manger state +type ContainerCPUAssignments map[string]cpuset.CPUSet + +// Clone returns a copy of ContainerCPUAssignments +func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments { + ret := make(ContainerCPUAssignments) + for key, val := range as { + ret[key] = val + } + return ret +} + // Reader interface used to read current cpu/pod assignment state type Reader interface { GetCPUSet(containerID string) (cpuset.CPUSet, bool) GetDefaultCPUSet() cpuset.CPUSet GetCPUSetOrDefault(containerID string) cpuset.CPUSet + GetCPUAssignments() ContainerCPUAssignments } type writer interface { SetCPUSet(containerID string, cpuset cpuset.CPUSet) SetDefaultCPUSet(cpuset cpuset.CPUSet) + SetCPUAssignments(ContainerCPUAssignments) Delete(containerID string) + ClearState() } // State interface provides methods for tracking and setting cpu/pod assignment diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go new file mode 100644 index 0000000000..b800613419 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -0,0 +1,195 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + "github.com/golang/glog" + "io/ioutil" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "os" + "sync" +) + +type stateFileData struct { + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]string `json:"entries,omitempty"` +} + +var _ State = &stateFile{} + +type stateFile struct { + sync.RWMutex + stateFilePath string + cache State +} + +// NewFileState creates new State for keeping track of cpu/pod assignment with file backend +func NewFileState(filePath string) State { + stateFile := &stateFile{ + stateFilePath: filePath, + cache: NewMemoryState(), + } + + if err := stateFile.tryRestoreState(); err != nil { + // could not restore state, init new state file + glog.Infof("[cpumanager] state file: initializing empty state file") + stateFile.cache.ClearState() + stateFile.storeState() + } + + return stateFile +} + +// tryRestoreState tries to read state file, upon any error, +// err message is logged and state is left clean. un-initialized +func (sf *stateFile) tryRestoreState() error { + sf.Lock() + defer sf.Unlock() + var err error + + // used when all parsing is ok + tmpAssignments := make(ContainerCPUAssignments) + tmpDefaultCPUSet := cpuset.NewCPUSet() + tmpContainerCPUSet := cpuset.NewCPUSet() + + var content []byte + + if content, err = ioutil.ReadFile(sf.stateFilePath); os.IsNotExist(err) { + // Create file + if _, err = os.Create(sf.stateFilePath); err != nil { + glog.Errorf("[cpumanager] state file: unable to create state file \"%s\":%s", sf.stateFilePath, err.Error()) + panic("[cpumanager] state file not created") + } + glog.Infof("[cpumanager] state file: created empty state file \"%s\"", sf.stateFilePath) + } else { + // File exists - try to read + var readState stateFileData + + if err = json.Unmarshal(content, &readState); err != nil { + glog.Warningf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) + return err + } + + if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { + glog.Warningf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) + return err + } + + for containerID, cpuString := range readState.Entries { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + glog.Warningf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) + return err + } + tmpAssignments[containerID] = tmpContainerCPUSet + } + + sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) + sf.cache.SetCPUAssignments(tmpAssignments) + + glog.V(2).Infof("[cpumanager] state file: restored state from state file \"%s\"", sf.stateFilePath) + glog.V(2).Infof("[cpumanager] state file: defaultCPUSet: %s", tmpDefaultCPUSet.String()) + } + return nil +} + +// saves state to a file, caller is responsible for locking +func (sf *stateFile) storeState() { + var content []byte + var err error + + data := stateFileData{ + DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), + Entries: map[string]string{}, + } + + for containerID, cset := range sf.cache.GetCPUAssignments() { + data.Entries[containerID] = cset.String() + } + + if content, err = json.Marshal(data); err != nil { + panic("[cpumanager] state file: could not serialize state to json") + } + + if err = ioutil.WriteFile(sf.stateFilePath, content, 0644); err != nil { + panic("[cpumanager] state file not written") + } + return +} + +func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { + sf.RLock() + defer sf.RUnlock() + + res, ok := sf.cache.GetCPUSet(containerID) + return res, ok +} + +func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet { + sf.RLock() + defer sf.RUnlock() + + return sf.cache.GetDefaultCPUSet() +} + +func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { + sf.RLock() + defer sf.RUnlock() + + return sf.cache.GetCPUSetOrDefault(containerID) +} + +func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { + sf.RLock() + defer sf.RUnlock() + return sf.cache.GetCPUAssignments() +} + +func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetCPUSet(containerID, cset) + sf.storeState() +} + +func (sf *stateFile) SetDefaultCPUSet(cset cpuset.CPUSet) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetDefaultCPUSet(cset) + sf.storeState() +} + +func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { + sf.Lock() + defer sf.Unlock() + sf.cache.SetCPUAssignments(a) + sf.storeState() +} + +func (sf *stateFile) Delete(containerID string) { + sf.Lock() + defer sf.Unlock() + sf.cache.Delete(containerID) + sf.storeState() +} + +func (sf *stateFile) ClearState() { + sf.Lock() + defer sf.Unlock() + sf.cache.ClearState() + sf.storeState() +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go new file mode 100644 index 0000000000..c04860ac3a --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -0,0 +1,446 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "bytes" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "reflect" + "strings" + "testing" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +func writeToStateFile(statefile string, content string) { + ioutil.WriteFile(statefile, []byte(content), 0644) +} + +func stateEqual(t *testing.T, sf State, sm State) { + cpusetSf := sf.GetDefaultCPUSet() + cpusetSm := sm.GetDefaultCPUSet() + if !cpusetSf.Equals(cpusetSm) { + t.Errorf("State CPUSet mismatch. Have %v, want %v", cpusetSf, cpusetSm) + } + + cpuassignmentSf := sf.GetCPUAssignments() + cpuassignmentSm := sm.GetCPUAssignments() + if !reflect.DeepEqual(cpuassignmentSf, cpuassignmentSm) { + t.Errorf("State CPU assigments mismatch. Have %s, want %s", cpuassignmentSf, cpuassignmentSm) + } +} + +func stderrCapture(t *testing.T, f func() State) (bytes.Buffer, State) { + stderr := os.Stderr + + readBuffer, writeBuffer, err := os.Pipe() + if err != nil { + t.Errorf("cannot create pipe: %v", err.Error()) + } + + os.Stderr = writeBuffer + var outputBuffer bytes.Buffer + + state := f() + writeBuffer.Close() + io.Copy(&outputBuffer, readBuffer) + os.Stderr = stderr + + return outputBuffer, state +} + +func TestFileStateTryRestore(t *testing.T) { + flag.Set("alsologtostderr", "true") + flag.Parse() + + testCases := []struct { + description string + stateFileContent string + expErr string + expectedState *stateMemory + }{ + { + "Invalid JSON - empty file", + "\n", + "state file: could not unmarshal, corrupted state file", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Invalid JSON - invalid content", + "{", + "state file: could not unmarshal, corrupted state file", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore defaultCPUSet only", + "{ \"defaultCpuSet\": \"4-6\"}", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), + }, + }, + { + "Try restore defaultCPUSet only - invalid name", + "{ \"defCPUSet\": \"4-6\"}", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore assignments only", + "{" + + "\"entries\": { " + + "\"container1\": \"4-6\"," + + "\"container2\": \"1-3\"" + + "} }", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore invalid assignments", + "{ \"entries\": }", + "state file: could not unmarshal, corrupted state file", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore valid file", + "{ " + + "\"defaultCpuSet\": \"23-24\", " + + "\"entries\": { " + + "\"container1\": \"4-6\", " + + "\"container2\": \"1-3\"" + + " } }", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(23, 24), + }, + }, + { + "Try restore un-parsable defaultCPUSet ", + "{ \"defaultCpuSet\": \"2-sd\" }", + "state file: could not parse state file", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Try restore un-parsable assignments", + "{ " + + "\"defaultCpuSet\": \"23-24\", " + + "\"entries\": { " + + "\"container1\": \"p-6\", " + + "\"container2\": \"1-3\"" + + " } }", + "state file: could not parse state file", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "TryRestoreState creates empty state file", + "", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + } + + for idx, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) + if err != nil { + t.Errorf("cannot create temporary file: %q", err.Error()) + } + // Don't create state file, let TryRestoreState figure out that is should create + if tc.stateFileContent != "" { + writeToStateFile(sfilePath.Name(), tc.stateFileContent) + } + + // Always remove file - regardless of who created + defer os.Remove(sfilePath.Name()) + + logData, fileState := stderrCapture(t, func() State { + return NewFileState(sfilePath.Name()) + }) + + if tc.expErr != "" { + if logData.String() != "" { + if !strings.Contains(logData.String(), tc.expErr) { + t.Errorf("TryRestoreState() error = %v, wantErr %v", logData.String(), tc.expErr) + return + } + } else { + t.Errorf("TryRestoreState() error = nil, wantErr %v", tc.expErr) + return + } + } + + stateEqual(t, fileState, tc.expectedState) + }) + } +} + +func TestFileStateTryRestorePanic(t *testing.T) { + + testCase := struct { + description string + wantPanic bool + panicMessage string + }{ + "Panic creating file", + true, + "[cpumanager] state file not created", + } + + t.Run(testCase.description, func(t *testing.T) { + sfilePath := path.Join("/invalid_path/to_some_dir", "cpumanager_state_file_test") + defer func() { + if err := recover(); err != nil { + if testCase.wantPanic { + if testCase.panicMessage == err { + t.Logf("TryRestoreState() got expected panic = %v", err) + return + } + t.Errorf("TryRestoreState() unexpected panic = %v, wantErr %v", err, testCase.panicMessage) + } + } + }() + NewFileState(sfilePath) + }) +} + +func TestUpdateStateFile(t *testing.T) { + flag.Set("alsologtostderr", "true") + flag.Parse() + + testCases := []struct { + description string + expErr string + expectedState *stateMemory + }{ + { + "Save empty state", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + { + "Save defaultCPUSet only", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(1, 6), + }, + }, + { + "Save assignments only", + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + defaultCPUSet: cpuset.NewCPUSet(), + }, + }, + } + + for idx, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + + sfilePath, err := ioutil.TempFile("/tmp", fmt.Sprintf("cpumanager_state_file_test_%d", idx)) + defer os.Remove(sfilePath.Name()) + if err != nil { + t.Errorf("cannot create temporary file: %q", err.Error()) + } + fileState := stateFile{ + stateFilePath: sfilePath.Name(), + cache: NewMemoryState(), + } + + fileState.SetDefaultCPUSet(tc.expectedState.defaultCPUSet) + fileState.SetCPUAssignments(tc.expectedState.assignments) + + logData, _ := stderrCapture(t, func() State { + fileState.storeState() + return &stateFile{} + }) + + errMsg := logData.String() + + if tc.expErr != "" { + if errMsg != "" { + if errMsg != tc.expErr { + t.Errorf("UpdateStateFile() error = %v, wantErr %v", errMsg, tc.expErr) + return + } + } else { + t.Errorf("UpdateStateFile() error = nil, wantErr %v", tc.expErr) + return + } + } else { + if errMsg != "" { + t.Errorf("UpdateStateFile() error = %v, wantErr nil", errMsg) + return + } + } + newFileState := NewFileState(sfilePath.Name()) + stateEqual(t, newFileState, tc.expectedState) + }) + } +} + +func TestHelpersStateFile(t *testing.T) { + testCases := []struct { + description string + defaultCPUset cpuset.CPUSet + containers map[string]cpuset.CPUSet + }{ + { + description: "one container", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 1), + }, + }, + { + description: "two containers", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, + }, + { + description: "container with more cpus than is possible", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 10), + }, + }, + { + description: "container without assigned cpus", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + sfFile, err := ioutil.TempFile("/tmp", "testHelpersStateFile") + defer os.Remove(sfFile.Name()) + if err != nil { + t.Errorf("cannot create temporary test file: %q", err.Error()) + } + + state := NewFileState(sfFile.Name()) + state.SetDefaultCPUSet(tc.defaultCPUset) + + for containerName, containerCPUs := range tc.containers { + state.SetCPUSet(containerName, containerCPUs) + if cpus, _ := state.GetCPUSet(containerName); !cpus.Equals(containerCPUs) { + t.Errorf("state is inconsistant. Wants = %q Have = %q", containerCPUs, cpus) + } + state.Delete(containerName) + if cpus := state.GetCPUSetOrDefault(containerName); !cpus.Equals(tc.defaultCPUset) { + t.Error("deleted container still existing in state") + } + + } + + }) + } +} + +func TestClearStateStateFile(t *testing.T) { + testCases := []struct { + description string + defaultCPUset cpuset.CPUSet + containers map[string]cpuset.CPUSet + }{ + { + description: "valid file", + defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), + containers: map[string]cpuset.CPUSet{ + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3), + "c3": cpuset.NewCPUSet(4, 5), + }, + }, + } + for _, testCase := range testCases { + t.Run(testCase.description, func(t *testing.T) { + sfFile, err := ioutil.TempFile("/tmp", "testHelpersStateFile") + defer os.Remove(sfFile.Name()) + if err != nil { + t.Errorf("cannot create temporary test file: %q", err.Error()) + } + + state := NewFileState(sfFile.Name()) + state.SetDefaultCPUSet(testCase.defaultCPUset) + for containerName, containerCPUs := range testCase.containers { + state.SetCPUSet(containerName, containerCPUs) + } + + state.ClearState() + if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { + t.Error("cleared state shoudn't has got information about available cpuset") + } + for containerName := range testCase.containers { + if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(containerName)) { + t.Error("cleared state shoudn't has got information about containers") + } + } + + }) + } +} diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go index 751b1726aa..797cdb15b2 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_mem.go +++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go @@ -25,7 +25,7 @@ import ( type stateMemory struct { sync.RWMutex - assignments map[string]cpuset.CPUSet + assignments ContainerCPUAssignments defaultCPUSet cpuset.CPUSet } @@ -35,7 +35,7 @@ var _ State = &stateMemory{} func NewMemoryState() State { glog.Infof("[cpumanager] initializing new in-memory state store") return &stateMemory{ - assignments: map[string]cpuset.CPUSet{}, + assignments: ContainerCPUAssignments{}, defaultCPUSet: cpuset.NewCPUSet(), } } @@ -65,6 +65,12 @@ func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { return s.GetDefaultCPUSet() } +func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments { + s.RLock() + defer s.RUnlock() + return s.assignments.Clone() +} + func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { s.Lock() defer s.Unlock() @@ -81,6 +87,14 @@ func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { glog.Infof("[cpumanager] updated default cpuset: \"%s\"", cset) } +func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) { + s.Lock() + defer s.Unlock() + + s.assignments = a.Clone() + glog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a) +} + func (s *stateMemory) Delete(containerID string) { s.Lock() defer s.Unlock() @@ -88,3 +102,12 @@ func (s *stateMemory) Delete(containerID string) { delete(s.assignments, containerID) glog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) } + +func (s *stateMemory) ClearState() { + s.Lock() + defer s.Unlock() + + s.defaultCPUSet = cpuset.CPUSet{} + s.assignments = make(ContainerCPUAssignments) + glog.V(2).Infof("[cpumanager] cleared state") +}