Merge pull request #3988 from ddysher/sync-status-ip

Move hostip to sync status
pull/6/head
Alex Robinson 2015-02-02 11:01:57 -08:00
commit d8d0bd1642
2 changed files with 101 additions and 33 deletions

View File

@ -18,7 +18,6 @@ package controller
import (
"errors"
"fmt"
"net"
"reflect"
"sync"
@ -33,7 +32,9 @@ import (
)
var (
ErrRegistration = errors.New("unable to register all nodes.")
ErrRegistration = errors.New("unable to register all nodes.")
ErrQueryIPAddress = errors.New("unable to query IP address.")
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
)
type NodeController struct {
@ -79,12 +80,16 @@ func (s *NodeController) Run(period time.Duration, retryCount int) {
} else {
nodes, err = s.StaticNodes()
if err != nil {
glog.Errorf("Error loading initial static nodes")
glog.Errorf("Error loading initial static nodes: %v", err)
}
}
nodes = s.DoChecks(nodes)
if err := s.RegisterNodes(nodes, retryCount, period); err != nil {
glog.Errorf("Error registrying node list: %+v", nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if err = s.RegisterNodes(nodes, retryCount, period); err != nil {
glog.Errorf("Error registrying node list %+v: %v", nodes, err)
}
// Start syncing node list from cloudprovider.
@ -183,6 +188,10 @@ func (s *NodeController) SyncNodeStatus() error {
oldNodes[node.Name] = node
}
nodes = s.DoChecks(nodes)
nodes, err = s.PopulateIPs(nodes)
if err != nil {
return err
}
for _, node := range nodes.Items {
if reflect.DeepEqual(node, oldNodes[node.Name]) {
glog.V(2).Infof("skip updating node %v", node.Name)
@ -197,6 +206,43 @@ func (s *NodeController) SyncNodeStatus() error {
return nil
}
// PopulateIPs queries IPs for given list of nodes.
func (s *NodeController) PopulateIPs(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
instances, ok := s.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
hostIP, err := instances.IPAddress(node.Name)
if err != nil {
glog.Errorf("error getting instance ip address for %s: %v", node.Name, err)
} else {
node.Status.HostIP = hostIP.String()
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
node.Status.HostIP = node.Name
} else {
addrs, err := net.LookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
node.Status.HostIP = addrs[0].String()
}
}
}
}
return nodes, nil
}
// DoChecks performs health checking for given list of nodes.
func (s *NodeController) DoChecks(nodes *api.NodeList) *api.NodeList {
var wg sync.WaitGroup
@ -245,19 +291,6 @@ func (s *NodeController) StaticNodes() (*api.NodeList, error) {
ObjectMeta: api.ObjectMeta{Name: nodeID},
Spec: api.NodeSpec{Capacity: s.staticResources.Capacity},
}
addr := net.ParseIP(nodeID)
if addr != nil {
node.Status.HostIP = nodeID
} else {
addrs, err := net.LookupIP(nodeID)
if err != nil {
glog.Errorf("Can't get ip address of node %v", nodeID)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", nodeID)
} else {
node.Status.HostIP = addrs[0].String()
}
}
result.Items = append(result.Items, node)
}
return result, nil
@ -269,7 +302,7 @@ func (s *NodeController) CloudNodes() (*api.NodeList, error) {
result := &api.NodeList{}
instances, ok := s.cloud.Instances()
if !ok {
return result, fmt.Errorf("cloud doesn't support instances")
return result, ErrCloudInstance
}
matches, err := instances.List(s.matchRE)
if err != nil {
@ -278,12 +311,6 @@ func (s *NodeController) CloudNodes() (*api.NodeList, error) {
for i := range matches {
node := api.Node{}
node.Name = matches[i]
hostIP, err := instances.IPAddress(matches[i])
if err != nil {
glog.Errorf("error getting instance ip address for %s: %v", matches[i], err)
} else {
node.Status.HostIP = hostIP.String()
}
resources, err := instances.GetNodeResources(matches[i])
if err != nil {
return nil, err

View File

@ -18,7 +18,6 @@ package controller
import (
"errors"
"fmt"
"net"
"reflect"
"sort"
@ -59,7 +58,7 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
m.CreatedNodes = append(m.CreatedNodes, &nodeCopy)
return node, nil
} else {
return nil, fmt.Errorf("Create error.")
return nil, errors.New("Create error.")
}
}
@ -255,7 +254,6 @@ func TestCreateCloudNodes(t *testing.T) {
{
fakeCloud: &fake_cloud.FakeCloud{
Machines: []string{"node0"},
IP: net.ParseIP("1.2.3.4"),
NodeResources: &api.NodeResources{Capacity: resourceList},
},
expectedNodes: &api.NodeList{
@ -263,7 +261,6 @@ func TestCreateCloudNodes(t *testing.T) {
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Spec: api.NodeSpec{Capacity: resourceList},
Status: api.NodeStatus{HostIP: "1.2.3.4"},
},
},
},
@ -404,10 +401,45 @@ func TestHealthCheckNode(t *testing.T) {
}
}
func TestPopulateNodeIPs(t *testing.T) {
table := []struct {
nodes *api.NodeList
fakeCloud *fake_cloud.FakeCloud
expectedFail bool
expectedIP string
}{
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{IP: net.ParseIP("1.2.3.4")},
expectedIP: "1.2.3.4",
},
{
nodes: &api.NodeList{Items: []api.Node{*newNode("node0"), *newNode("node1")}},
fakeCloud: &fake_cloud.FakeCloud{Err: ErrQueryIPAddress},
expectedIP: "",
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil)
result, err := nodeController.PopulateIPs(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
t.Errorf("unexpected error: %v", err)
}
for _, node := range result.Items {
if node.Status.HostIP != item.expectedIP {
t.Errorf("expect HostIP %s, got %s", item.expectedIP, node.Status.HostIP)
}
}
}
}
func TestSyncNodeStatus(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
fakeCloud *fake_cloud.FakeCloud
expectedNodes []*api.Node
expectedRequestCount int
}{
@ -419,14 +451,23 @@ func TestSyncNodeStatus(t *testing.T) {
Status: probe.Success,
Err: nil,
},
fakeCloud: &fake_cloud.FakeCloud{
IP: net.ParseIP("1.2.3.4"),
},
expectedNodes: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}},
HostIP: "1.2.3.4",
},
},
{
ObjectMeta: api.ObjectMeta{Name: "node1"},
Status: api.NodeStatus{Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}}},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{{Kind: api.NodeReady, Status: api.ConditionFull}},
HostIP: "1.2.3.4",
},
},
},
expectedRequestCount: 3, // List + 2xUpdate
@ -434,7 +475,7 @@ func TestSyncNodeStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient)
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient)
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -442,7 +483,7 @@ func TestSyncNodeStatus(t *testing.T) {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes, item.fakeNodeHandler.UpdatedNodes)
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
}
item.fakeNodeHandler.RequestCount = 0
if err := nodeController.SyncNodeStatus(); err != nil {