mirror of https://github.com/k3s-io/k3s
Merge pull request #6292 from jszczepkowski/scale-pool
Updating target pools on cloud nodes change.pull/6/head
commit
59ab41c8f7
|
@ -221,7 +221,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
||||||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
||||||
}}
|
}}
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second)
|
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||||
nodeController.Run(5*time.Second, true)
|
nodeController.Run(5*time.Second, true)
|
||||||
cadvisorInterface := new(cadvisor.Fake)
|
cadvisorInterface := new(cadvisor.Fake)
|
||||||
|
|
||||||
|
|
|
@ -70,6 +70,7 @@ type CMServer struct {
|
||||||
NodeMemory resource.Quantity
|
NodeMemory resource.Quantity
|
||||||
|
|
||||||
KubeletConfig client.KubeletConfig
|
KubeletConfig client.KubeletConfig
|
||||||
|
ClusterName string
|
||||||
EnableProfiling bool
|
EnableProfiling bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,6 +92,7 @@ func NewCMServer() *CMServer {
|
||||||
EnableHttps: true,
|
EnableHttps: true,
|
||||||
HTTPTimeout: time.Duration(5) * time.Second,
|
HTTPTimeout: time.Duration(5) * time.Second,
|
||||||
},
|
},
|
||||||
|
ClusterName: "kubernetes",
|
||||||
}
|
}
|
||||||
return &s
|
return &s
|
||||||
}
|
}
|
||||||
|
@ -132,7 +134,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
||||||
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
|
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")
|
||||||
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node")
|
fs.Var(resource.NewQuantityFlagValue(&s.NodeMemory), "node_memory", "The amount of memory (in bytes) provisioned on each node")
|
||||||
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
|
client.BindKubeletClientConfigFlags(fs, &s.KubeletConfig)
|
||||||
fs.BoolVar(&s.EnableProfiling, "profiling", true, "Enable profiling via web interface host:port/debug/pprof/")
|
fs.StringVar(&s.ClusterName, "cluster_name", s.ClusterName, "The instance prefix for the cluster")
|
||||||
|
fs.BoolVar(&s.EnableProfiling, "profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *CMServer) verifyMinionFlags() {
|
func (s *CMServer) verifyMinionFlags() {
|
||||||
|
@ -198,7 +201,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
||||||
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
|
||||||
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod)
|
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, s.ClusterName)
|
||||||
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList)
|
||||||
|
|
||||||
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
||||||
|
|
|
@ -131,7 +131,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
||||||
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
||||||
|
|
||||||
nodeController := nodeControllerPkg.NewNodeController(
|
nodeController := nodeControllerPkg.NewNodeController(
|
||||||
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second)
|
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst), 40*time.Second, 60*time.Second, 5*time.Second, "")
|
||||||
nodeController.Run(10*time.Second, true)
|
nodeController.Run(10*time.Second, true)
|
||||||
|
|
||||||
endpoints := service.NewEndpointController(cl)
|
endpoints := service.NewEndpointController(cl)
|
||||||
|
|
|
@ -42,6 +42,10 @@ type Clusters interface {
|
||||||
Master(clusterName string) (string, error)
|
Master(clusterName string) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetLoadBalancerName(clusterName, serviceNamespace, serviceName string) string {
|
||||||
|
return clusterName + "-" + serviceNamespace + "-" + serviceName
|
||||||
|
}
|
||||||
|
|
||||||
// TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers.
|
// TCPLoadBalancer is an abstract, pluggable interface for TCP load balancers.
|
||||||
type TCPLoadBalancer interface {
|
type TCPLoadBalancer interface {
|
||||||
// TCPLoadBalancerExists returns whether the specified load balancer exists.
|
// TCPLoadBalancerExists returns whether the specified load balancer exists.
|
||||||
|
|
|
@ -89,6 +89,9 @@ type NodeController struct {
|
||||||
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
|
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod.
|
||||||
// TODO: Change node status monitor to watch based.
|
// TODO: Change node status monitor to watch based.
|
||||||
nodeMonitorPeriod time.Duration
|
nodeMonitorPeriod time.Duration
|
||||||
|
clusterName string
|
||||||
|
// Should external services be reconciled during syncing cloud nodes, even though the nodes were not changed.
|
||||||
|
reconcileServices bool
|
||||||
// Method for easy mocking in unittest.
|
// Method for easy mocking in unittest.
|
||||||
lookupIP func(host string) ([]net.IP, error)
|
lookupIP func(host string) ([]net.IP, error)
|
||||||
now func() util.Time
|
now func() util.Time
|
||||||
|
@ -107,7 +110,8 @@ func NewNodeController(
|
||||||
deletingPodsRateLimiter util.RateLimiter,
|
deletingPodsRateLimiter util.RateLimiter,
|
||||||
nodeMonitorGracePeriod time.Duration,
|
nodeMonitorGracePeriod time.Duration,
|
||||||
nodeStartupGracePeriod time.Duration,
|
nodeStartupGracePeriod time.Duration,
|
||||||
nodeMonitorPeriod time.Duration) *NodeController {
|
nodeMonitorPeriod time.Duration,
|
||||||
|
clusterName string) *NodeController {
|
||||||
eventBroadcaster := record.NewBroadcaster()
|
eventBroadcaster := record.NewBroadcaster()
|
||||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
||||||
if kubeClient != nil {
|
if kubeClient != nil {
|
||||||
|
@ -133,6 +137,7 @@ func NewNodeController(
|
||||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||||
lookupIP: net.LookupIP,
|
lookupIP: net.LookupIP,
|
||||||
now: util.Now,
|
now: util.Now,
|
||||||
|
clusterName: clusterName,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +225,62 @@ func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, ret
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reconcileExternalServices updates balancers for external services, so that they will match the nodes given.
|
||||||
|
// Returns true if something went wrong and we should call reconcile again.
|
||||||
|
func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (shouldRetry bool) {
|
||||||
|
balancer, ok := nc.cloud.TCPLoadBalancer()
|
||||||
|
if !ok {
|
||||||
|
glog.Error("The cloud provider does not support external TCP load balancers.")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
zones, ok := nc.cloud.Zones()
|
||||||
|
if !ok {
|
||||||
|
glog.Error("The cloud provider does not support zone enumeration.")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
zone, err := zones.GetZone()
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error while getting zone: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
hosts := []string{}
|
||||||
|
for _, node := range nodes.Items {
|
||||||
|
hosts = append(hosts, node.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
services, err := nc.kubeClient.Services(api.NamespaceAll).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Error while listing services: %v", err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
shouldRetry = false
|
||||||
|
for _, service := range services.Items {
|
||||||
|
if service.Spec.CreateExternalLoadBalancer {
|
||||||
|
nonTCPPort := false
|
||||||
|
for i := range service.Spec.Ports {
|
||||||
|
if service.Spec.Ports[i].Protocol != api.ProtocolTCP {
|
||||||
|
nonTCPPort = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if nonTCPPort {
|
||||||
|
// TODO: Support UDP here.
|
||||||
|
glog.Errorf("External load balancers for non TCP services are not currently supported: %v.", service)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
name := cloudprovider.GetLoadBalancerName(nc.clusterName, service.Namespace, service.Name)
|
||||||
|
err := balancer.UpdateTCPLoadBalancer(name, zone.Region, hosts)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("External error while updating TCP load balancer: %v.", err)
|
||||||
|
shouldRetry = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return shouldRetry
|
||||||
|
}
|
||||||
|
|
||||||
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
||||||
func (nc *NodeController) SyncCloudNodes() error {
|
func (nc *NodeController) SyncCloudNodes() error {
|
||||||
matches, err := nc.GetCloudNodesWithSpec()
|
matches, err := nc.GetCloudNodesWithSpec()
|
||||||
|
@ -238,6 +299,7 @@ func (nc *NodeController) SyncCloudNodes() error {
|
||||||
|
|
||||||
// Create nodes which have been created in cloud, but not in kubernetes cluster
|
// Create nodes which have been created in cloud, but not in kubernetes cluster
|
||||||
// Skip nodes if we hit an error while trying to get their addresses.
|
// Skip nodes if we hit an error while trying to get their addresses.
|
||||||
|
nodesChanged := false
|
||||||
for _, node := range matches.Items {
|
for _, node := range matches.Items {
|
||||||
if _, ok := nodeMap[node.Name]; !ok {
|
if _, ok := nodeMap[node.Name]; !ok {
|
||||||
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
||||||
|
@ -255,6 +317,7 @@ func (nc *NodeController) SyncCloudNodes() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("Create node %s error: %v", node.Name, err)
|
glog.Errorf("Create node %s error: %v", node.Name, err)
|
||||||
}
|
}
|
||||||
|
nodesChanged = true
|
||||||
}
|
}
|
||||||
delete(nodeMap, node.Name)
|
delete(nodeMap, node.Name)
|
||||||
}
|
}
|
||||||
|
@ -267,6 +330,15 @@ func (nc *NodeController) SyncCloudNodes() error {
|
||||||
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
glog.Errorf("Delete node %s error: %v", nodeID, err)
|
||||||
}
|
}
|
||||||
nc.deletePods(nodeID)
|
nc.deletePods(nodeID)
|
||||||
|
nodesChanged = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make external services aware of nodes currently present in the cluster.
|
||||||
|
if nodesChanged || nc.reconcileServices {
|
||||||
|
nc.reconcileServices = nc.reconcileExternalServices(matches)
|
||||||
|
if nc.reconcileServices {
|
||||||
|
glog.Error("Reconcilation of external services failed and will be retried during the next sync.")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -258,7 +258,7 @@ func TestRegisterNodes(t *testing.T) {
|
||||||
nodes.Items = append(nodes.Items, *newNode(machine))
|
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||||
if !item.expectedFail && err != nil {
|
if !item.expectedFail && err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -344,7 +344,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
|
||||||
}
|
}
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute,
|
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
nodes, err := nodeController.GetStaticNodesWithSpec()
|
nodes, err := nodeController.GetStaticNodesWithSpec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -406,7 +406,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
nodes, err := nodeController.GetCloudNodesWithSpec()
|
nodes, err := nodeController.GetCloudNodesWithSpec()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -516,7 +516,7 @@ func TestSyncCloudNodes(t *testing.T) {
|
||||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -577,7 +577,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||||
matchRE: ".*",
|
matchRE: ".*",
|
||||||
expectedRequestCount: 2, // List + Delete
|
expectedRequestCount: 2, // List + Delete
|
||||||
expectedDeleted: []string{"node1"},
|
expectedDeleted: []string{"node1"},
|
||||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
|
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}, {Action: "list-services"}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
// Delete node1, but pod0 is running on node0.
|
// Delete node1, but pod0 is running on node0.
|
||||||
|
@ -591,7 +591,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||||
matchRE: ".*",
|
matchRE: ".*",
|
||||||
expectedRequestCount: 2, // List + Delete
|
expectedRequestCount: 2, // List + Delete
|
||||||
expectedDeleted: []string{"node1"},
|
expectedDeleted: []string{"node1"},
|
||||||
expectedActions: []testclient.FakeAction{{Action: "list-pods"}},
|
expectedActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -600,7 +600,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||||
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
|
||||||
}
|
}
|
||||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -617,6 +617,71 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSyncCloudNodesReconcilesExternalService(t *testing.T) {
|
||||||
|
table := []struct {
|
||||||
|
fakeNodeHandler *FakeNodeHandler
|
||||||
|
fakeCloud *fake_cloud.FakeCloud
|
||||||
|
matchRE string
|
||||||
|
expectedClientActions []testclient.FakeAction
|
||||||
|
expectedUpdateCalls []fake_cloud.FakeUpdateBalancerCall
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// Set of nodes does not change: do nothing.
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
|
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0", "node1"},
|
||||||
|
},
|
||||||
|
matchRE: ".*",
|
||||||
|
expectedClientActions: nil,
|
||||||
|
expectedUpdateCalls: nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Delete "node1", target pool for "service0" should shrink.
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0"), newNode("node1")},
|
||||||
|
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0"},
|
||||||
|
},
|
||||||
|
matchRE: ".*",
|
||||||
|
expectedClientActions: []testclient.FakeAction{{Action: "list-pods"}, {Action: "list-services"}},
|
||||||
|
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||||
|
{Name: "kubernetes-namespace-service0", Hosts: []string{"node0"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Add "node1", target pool for "service0" should grow.
|
||||||
|
fakeNodeHandler: &FakeNodeHandler{
|
||||||
|
Existing: []*api.Node{newNode("node0")},
|
||||||
|
Fake: testclient.NewSimpleFake(&api.ServiceList{Items: []api.Service{*newService("service0", true), *newService("service1", false)}})},
|
||||||
|
fakeCloud: &fake_cloud.FakeCloud{
|
||||||
|
Machines: []string{"node0", "node1"},
|
||||||
|
},
|
||||||
|
matchRE: ".*",
|
||||||
|
expectedClientActions: []testclient.FakeAction{{Action: "list-services"}},
|
||||||
|
expectedUpdateCalls: []fake_cloud.FakeUpdateBalancerCall{
|
||||||
|
{Name: "kubernetes-namespace-service0", Hosts: []string{"node0", "node1"}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil,
|
||||||
|
10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes")
|
||||||
|
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedClientActions, item.fakeNodeHandler.Actions) {
|
||||||
|
t.Errorf("expected client actions mismatch, expected %+v, got %+v", item.expectedClientActions, item.fakeNodeHandler.Actions)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(item.expectedUpdateCalls, item.fakeCloud.UpdateCalls) {
|
||||||
|
t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, item.fakeCloud.UpdateCalls)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPopulateNodeAddresses(t *testing.T) {
|
func TestPopulateNodeAddresses(t *testing.T) {
|
||||||
table := []struct {
|
table := []struct {
|
||||||
nodes *api.NodeList
|
nodes *api.NodeList
|
||||||
|
@ -640,7 +705,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute,
|
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute,
|
||||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
result, err := nodeController.PopulateAddresses(item.nodes)
|
result, err := nodeController.PopulateAddresses(item.nodes)
|
||||||
// In case of IP querying error, we should continue.
|
// In case of IP querying error, we should continue.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -840,7 +905,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10,
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10,
|
||||||
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
||||||
testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -1042,7 +1107,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
||||||
|
|
||||||
for _, item := range table {
|
for _, item := range table {
|
||||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod)
|
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||||
nodeController.now = func() util.Time { return fakeNow }
|
nodeController.now = func() util.Time { return fakeNow }
|
||||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -1083,6 +1148,10 @@ func newPod(name, host string) *api.Pod {
|
||||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{Host: host}}
|
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Spec: api.PodSpec{Host: host}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newService(name string, external bool) *api.Service {
|
||||||
|
return &api.Service{ObjectMeta: api.ObjectMeta{Name: name, Namespace: "namespace"}, Spec: api.ServiceSpec{CreateExternalLoadBalancer: external}}
|
||||||
|
}
|
||||||
|
|
||||||
func sortedNodeNames(nodes []*api.Node) []string {
|
func sortedNodeNames(nodes []*api.Node) []string {
|
||||||
nodeNames := []string{}
|
nodeNames := []string{}
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
|
|
|
@ -33,6 +33,12 @@ type FakeBalancer struct {
|
||||||
Hosts []string
|
Hosts []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type FakeUpdateBalancerCall struct {
|
||||||
|
Name string
|
||||||
|
Region string
|
||||||
|
Hosts []string
|
||||||
|
}
|
||||||
|
|
||||||
// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing.
|
// FakeCloud is a test-double implementation of Interface, TCPLoadBalancer and Instances. It is useful for testing.
|
||||||
type FakeCloud struct {
|
type FakeCloud struct {
|
||||||
Exists bool
|
Exists bool
|
||||||
|
@ -46,7 +52,7 @@ type FakeCloud struct {
|
||||||
MasterName string
|
MasterName string
|
||||||
ExternalIP net.IP
|
ExternalIP net.IP
|
||||||
Balancers []FakeBalancer
|
Balancers []FakeBalancer
|
||||||
|
UpdateCalls []FakeUpdateBalancerCall
|
||||||
cloudprovider.Zone
|
cloudprovider.Zone
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,6 +111,7 @@ func (f *FakeCloud) CreateTCPLoadBalancer(name, region string, externalIP net.IP
|
||||||
// It adds an entry "update" into the internal method call record.
|
// It adds an entry "update" into the internal method call record.
|
||||||
func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
func (f *FakeCloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
||||||
f.addCall("update")
|
f.addCall("update")
|
||||||
|
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{name, region, hosts})
|
||||||
return f.Err
|
return f.Err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import (
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
|
|
||||||
"code.google.com/p/gcfg"
|
"code.google.com/p/gcfg"
|
||||||
compute "code.google.com/p/google-api-go-client/compute/v1"
|
compute "code.google.com/p/google-api-go-client/compute/v1"
|
||||||
|
@ -95,7 +96,12 @@ func getProjectAndZone() (string, string, error) {
|
||||||
if len(parts) != 4 {
|
if len(parts) != 4 {
|
||||||
return "", "", fmt.Errorf("unexpected response: %s", result)
|
return "", "", fmt.Errorf("unexpected response: %s", result)
|
||||||
}
|
}
|
||||||
return parts[1], parts[3], nil
|
zone := parts[3]
|
||||||
|
projectID, err := metadata.ProjectID()
|
||||||
|
if err != nil {
|
||||||
|
return "", "", err
|
||||||
|
}
|
||||||
|
return projectID, zone, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getInstanceID() (string, error) {
|
func getInstanceID() (string, error) {
|
||||||
|
@ -292,15 +298,41 @@ func (gce *GCECloud) CreateTCPLoadBalancer(name, region string, externalIP net.I
|
||||||
|
|
||||||
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
|
// UpdateTCPLoadBalancer is an implementation of TCPLoadBalancer.UpdateTCPLoadBalancer.
|
||||||
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
func (gce *GCECloud) UpdateTCPLoadBalancer(name, region string, hosts []string) error {
|
||||||
var refs []*compute.InstanceReference
|
pool, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
|
||||||
for _, host := range hosts {
|
if err != nil {
|
||||||
refs = append(refs, &compute.InstanceReference{host})
|
return err
|
||||||
}
|
}
|
||||||
req := &compute.TargetPoolsAddInstanceRequest{
|
existing := util.NewStringSet(pool.Instances...)
|
||||||
Instances: refs,
|
|
||||||
|
var toAdd []*compute.InstanceReference
|
||||||
|
var toRemove []*compute.InstanceReference
|
||||||
|
for _, host := range hosts {
|
||||||
|
link := makeHostLink(gce.projectID, gce.zone, host)
|
||||||
|
if !existing.Has(link) {
|
||||||
|
toAdd = append(toAdd, &compute.InstanceReference{link})
|
||||||
|
}
|
||||||
|
existing.Delete(link)
|
||||||
|
}
|
||||||
|
for link := range existing {
|
||||||
|
toRemove = append(toRemove, &compute.InstanceReference{link})
|
||||||
}
|
}
|
||||||
|
|
||||||
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, req).Do()
|
add := &compute.TargetPoolsAddInstanceRequest{
|
||||||
|
Instances: toAdd,
|
||||||
|
}
|
||||||
|
op, err := gce.service.TargetPools.AddInstance(gce.projectID, region, name, add).Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = gce.waitForRegionOp(op, region)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rm := &compute.TargetPoolsRemoveInstanceRequest{
|
||||||
|
Instances: toRemove,
|
||||||
|
}
|
||||||
|
op, err = gce.service.TargetPools.RemoveInstance(gce.projectID, region, name, rm).Do()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -283,10 +283,6 @@ func (rs *REST) ResourceLocation(ctx api.Context, id string) (*url.URL, http.Rou
|
||||||
return nil, nil, fmt.Errorf("no endpoints available for %q", id)
|
return nil, nil, fmt.Errorf("no endpoints available for %q", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rs *REST) getLoadbalancerName(ctx api.Context, service *api.Service) string {
|
|
||||||
return rs.clusterName + "-" + api.NamespaceValue(ctx) + "-" + service.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {
|
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {
|
||||||
if rs.cloud == nil {
|
if rs.cloud == nil {
|
||||||
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
|
return fmt.Errorf("requested an external service, but no cloud provider supplied.")
|
||||||
|
@ -313,7 +309,7 @@ func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
name := rs.getLoadbalancerName(ctx, service)
|
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
|
||||||
var affinityType api.AffinityType = service.Spec.SessionAffinity
|
var affinityType api.AffinityType = service.Spec.SessionAffinity
|
||||||
if len(service.Spec.PublicIPs) > 0 {
|
if len(service.Spec.PublicIPs) > 0 {
|
||||||
for _, publicIP := range service.Spec.PublicIPs {
|
for _, publicIP := range service.Spec.PublicIPs {
|
||||||
|
@ -386,7 +382,8 @@ func (rs *REST) deleteExternalLoadBalancer(ctx api.Context, service *api.Service
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := balancer.DeleteTCPLoadBalancer(rs.getLoadbalancerName(ctx, service), zone.Region); err != nil {
|
name := cloudprovider.GetLoadBalancerName(rs.clusterName, api.NamespaceValue(ctx), service.Name)
|
||||||
|
if err := balancer.DeleteTCPLoadBalancer(name, zone.Region); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
|
Loading…
Reference in New Issue