mirror of https://github.com/k3s-io/k3s
Restrict functions and structs that aren't used outside of the
nodecontroller to have package private scope. This makes reasoning about the code in this package much simpler.pull/6/head
parent
2823576194
commit
f633ac0ab5
|
@ -41,12 +41,10 @@ var (
|
|||
ErrCloudInstance = errors.New("cloud provider doesn't support instances.")
|
||||
)
|
||||
|
||||
const (
|
||||
// Constant controlling number of retries of writing NodeStatus update.
|
||||
nodeStatusUpdateRetry = 5
|
||||
)
|
||||
// nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update.
|
||||
const nodeStatusUpdateRetry = 5
|
||||
|
||||
type NodeStatusData struct {
|
||||
type nodeStatusData struct {
|
||||
probeTimestamp util.Time
|
||||
readyTransitionTimestamp util.Time
|
||||
status api.NodeStatus
|
||||
|
@ -62,10 +60,10 @@ type NodeController struct {
|
|||
registerRetryCount int
|
||||
podEvictionTimeout time.Duration
|
||||
deletingPodsRateLimiter util.RateLimiter
|
||||
// per Node map storing last observed Status togheter with a local time when it was observed.
|
||||
// per Node map storing last observed Status together with a local time when it was observed.
|
||||
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
|
||||
// to aviod the problem with time skew across the cluster.
|
||||
nodeStatusMap map[string]NodeStatusData
|
||||
nodeStatusMap map[string]nodeStatusData
|
||||
// Value used if sync_nodes_status=False. NodeController will not proactively
|
||||
// sync node status in this case, but will monitor node status updated from kubelet. If
|
||||
// it doesn't receive update for this amount of time, it will start posting "NodeReady==
|
||||
|
@ -128,7 +126,7 @@ func NewNodeController(
|
|||
registerRetryCount: registerRetryCount,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
deletingPodsRateLimiter: deletingPodsRateLimiter,
|
||||
nodeStatusMap: make(map[string]NodeStatusData),
|
||||
nodeStatusMap: make(map[string]nodeStatusData),
|
||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||
nodeMonitorPeriod: nodeMonitorPeriod,
|
||||
nodeStartupGracePeriod: nodeStartupGracePeriod,
|
||||
|
@ -140,12 +138,12 @@ func NewNodeController(
|
|||
|
||||
// Run creates initial node list and start syncing instances from cloudprovider, if any.
|
||||
// It also starts syncing or monitoring cluster node status.
|
||||
// 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider
|
||||
// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider
|
||||
// or from command line flag). To make cluster bootstrap faster, node controller populates
|
||||
// node addresses.
|
||||
// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
|
||||
// 2. syncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
|
||||
// Node created here will only have specs.
|
||||
// 3. MonitorNodeStatus() is called periodically to incorporate the results of node status
|
||||
// 3. monitorNodeStatus() is called periodically to incorporate the results of node status
|
||||
// pushed from kubelet to master.
|
||||
func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
|
||||
// Register intial set of nodes with their status set.
|
||||
|
@ -153,28 +151,29 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
|
|||
var err error
|
||||
if nc.isRunningCloudProvider() {
|
||||
if syncNodeList {
|
||||
if nodes, err = nc.GetCloudNodesWithSpec(); err != nil {
|
||||
if nodes, err = nc.getCloudNodesWithSpec(); err != nil {
|
||||
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
|
||||
}
|
||||
} else {
|
||||
nodes = &api.NodeList{}
|
||||
}
|
||||
} else {
|
||||
if nodes, err = nc.GetStaticNodesWithSpec(); err != nil {
|
||||
if nodes, err = nc.getStaticNodesWithSpec(); err != nil {
|
||||
glog.Errorf("Error loading initial static nodes: %v", err)
|
||||
}
|
||||
}
|
||||
if nodes, err = nc.PopulateAddresses(nodes); err != nil {
|
||||
|
||||
if nodes, err = nc.populateAddresses(nodes); err != nil {
|
||||
glog.Errorf("Error getting nodes ips: %v", err)
|
||||
}
|
||||
if err = nc.RegisterNodes(nodes, nc.registerRetryCount, period); err != nil {
|
||||
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
|
||||
glog.Errorf("Error registering node list %+v: %v", nodes, err)
|
||||
}
|
||||
|
||||
// Start syncing node list from cloudprovider.
|
||||
if syncNodeList && nc.isRunningCloudProvider() {
|
||||
go util.Forever(func() {
|
||||
if err := nc.SyncCloudNodes(); err != nil {
|
||||
if err := nc.syncCloudNodes(); err != nil {
|
||||
glog.Errorf("Error syncing cloud: %v", err)
|
||||
}
|
||||
}, period)
|
||||
|
@ -182,14 +181,14 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
|
|||
|
||||
// Start monitoring node status.
|
||||
go util.Forever(func() {
|
||||
if err = nc.MonitorNodeStatus(); err != nil {
|
||||
if err := nc.monitorNodeStatus(); err != nil {
|
||||
glog.Errorf("Error monitoring node status: %v", err)
|
||||
}
|
||||
}, nc.nodeMonitorPeriod)
|
||||
}
|
||||
|
||||
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
|
||||
func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
|
||||
// registerNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
|
||||
func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
|
||||
if len(nodes.Items) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -278,9 +277,9 @@ func (nc *NodeController) reconcileExternalServices(nodes *api.NodeList) (should
|
|||
return shouldRetry
|
||||
}
|
||||
|
||||
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
||||
func (nc *NodeController) SyncCloudNodes() error {
|
||||
matches, err := nc.GetCloudNodesWithSpec()
|
||||
// syncCloudNodes synchronizes the list of instances from cloudprovider to master server.
|
||||
func (nc *NodeController) syncCloudNodes() error {
|
||||
matches, err := nc.getCloudNodesWithSpec()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -302,7 +301,7 @@ func (nc *NodeController) SyncCloudNodes() error {
|
|||
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
|
||||
nodeList := &api.NodeList{}
|
||||
nodeList.Items = []api.Node{node}
|
||||
_, err = nc.PopulateAddresses(nodeList)
|
||||
_, err = nc.populateAddresses(nodeList)
|
||||
if err != nil {
|
||||
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
|
||||
continue
|
||||
|
@ -341,8 +340,8 @@ func (nc *NodeController) SyncCloudNodes() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PopulateAddresses queries Address for given list of nodes.
|
||||
func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
|
||||
// populateAddresses queries Address for given list of nodes.
|
||||
func (nc *NodeController) populateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
|
||||
if nc.isRunningCloudProvider() {
|
||||
instances, ok := nc.cloud.Instances()
|
||||
if !ok {
|
||||
|
@ -411,7 +410,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
LastTransitionTime: node.CreationTimestamp,
|
||||
}
|
||||
gracePeriod = nc.nodeStartupGracePeriod
|
||||
nc.nodeStatusMap[node.Name] = NodeStatusData{
|
||||
nc.nodeStatusMap[node.Name] = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: node.CreationTimestamp,
|
||||
readyTransitionTimestamp: node.CreationTimestamp,
|
||||
|
@ -441,7 +440,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
observedCondition := nc.getCondition(&node.Status, api.NodeReady)
|
||||
if !found {
|
||||
glog.Warningf("Missing timestamp for Node %s. Assuming now as a timestamp.", node.Name)
|
||||
savedNodeStatus = NodeStatusData{
|
||||
savedNodeStatus = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
|
@ -449,7 +448,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
nc.nodeStatusMap[node.Name] = savedNodeStatus
|
||||
} else if savedCondition == nil && observedCondition != nil {
|
||||
glog.V(1).Infof("Creating timestamp entry for newly observed Node %s", node.Name)
|
||||
savedNodeStatus = NodeStatusData{
|
||||
savedNodeStatus = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
|
@ -458,7 +457,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
} else if savedCondition != nil && observedCondition == nil {
|
||||
glog.Errorf("ReadyCondition was removed from Status of Node %s", node.Name)
|
||||
// TODO: figure out what to do in this case. For now we do the same thing as above.
|
||||
savedNodeStatus = NodeStatusData{
|
||||
savedNodeStatus = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
|
@ -476,7 +475,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
transitionTime = savedNodeStatus.readyTransitionTimestamp
|
||||
}
|
||||
glog.V(3).Infof("Nodes ReadyCondition updated. Updating timestamp: %+v\n vs %+v.", savedNodeStatus.status, node.Status)
|
||||
savedNodeStatus = NodeStatusData{
|
||||
savedNodeStatus = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.now(),
|
||||
readyTransitionTimestamp: transitionTime,
|
||||
|
@ -512,7 +511,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
glog.Errorf("Error updating node %s: %v", node.Name, err)
|
||||
return gracePeriod, lastReadyCondition, readyCondition, err
|
||||
} else {
|
||||
nc.nodeStatusMap[node.Name] = NodeStatusData{
|
||||
nc.nodeStatusMap[node.Name] = nodeStatusData{
|
||||
status: node.Status,
|
||||
probeTimestamp: nc.nodeStatusMap[node.Name].probeTimestamp,
|
||||
readyTransitionTimestamp: nc.now(),
|
||||
|
@ -525,10 +524,10 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
|
|||
return gracePeriod, lastReadyCondition, readyCondition, err
|
||||
}
|
||||
|
||||
// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
|
||||
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
|
||||
// not reachable for a long period of time.
|
||||
func (nc *NodeController) MonitorNodeStatus() error {
|
||||
func (nc *NodeController) monitorNodeStatus() error {
|
||||
nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -592,10 +591,10 @@ func (nc *NodeController) MonitorNodeStatus() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
|
||||
// getStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
|
||||
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
|
||||
// constructs spec fields for nodes.
|
||||
func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
|
||||
func (nc *NodeController) getStaticNodesWithSpec() (*api.NodeList, error) {
|
||||
result := &api.NodeList{}
|
||||
for _, nodeID := range nc.nodes {
|
||||
node := api.Node{
|
||||
|
@ -612,10 +611,10 @@ func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
|
|||
return result, nil
|
||||
}
|
||||
|
||||
// GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
|
||||
// getCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
|
||||
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
|
||||
// constructs spec fields for nodes.
|
||||
func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
||||
func (nc *NodeController) getCloudNodesWithSpec() (*api.NodeList, error) {
|
||||
result := &api.NodeList{}
|
||||
instances, ok := nc.cloud.Instances()
|
||||
if !ok {
|
||||
|
@ -640,7 +639,7 @@ func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
|
|||
}
|
||||
instanceID, err := instances.ExternalID(node.Name)
|
||||
if err != nil {
|
||||
glog.Errorf("error getting instance id for %s: %v", node.Name, err)
|
||||
glog.Errorf("Error getting instance id for %s: %v", node.Name, err)
|
||||
} else {
|
||||
node.Spec.ExternalID = instanceID
|
||||
}
|
||||
|
|
|
@ -239,7 +239,7 @@ func TestRegisterNodes(t *testing.T) {
|
|||
}
|
||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||
err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond)
|
||||
if !item.expectedFail && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -325,7 +325,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
|
|||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodes, err := nodeController.GetStaticNodesWithSpec()
|
||||
nodes, err := nodeController.getStaticNodesWithSpec()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -387,7 +387,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
|||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodes, err := nodeController.GetCloudNodesWithSpec()
|
||||
nodes, err := nodeController.getCloudNodesWithSpec()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -497,7 +497,7 @@ func TestSyncCloudNodes(t *testing.T) {
|
|||
}
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
if err := nodeController.syncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||
|
@ -581,7 +581,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
|||
}
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
if err := nodeController.syncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
|
||||
|
@ -650,7 +650,7 @@ func TestSyncCloudNodesReconcilesExternalService(t *testing.T) {
|
|||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler,
|
||||
10, time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "kubernetes")
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
if err := nodeController.syncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(item.expectedClientActions, item.fakeNodeHandler.Actions) {
|
||||
|
@ -686,7 +686,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
|||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
|
||||
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
result, err := nodeController.PopulateAddresses(item.nodes)
|
||||
result, err := nodeController.populateAddresses(item.nodes)
|
||||
// In case of IP querying error, we should continue.
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -887,14 +887,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
|
||||
testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item.timeToPass > 0 {
|
||||
nodeController.now = func() util.Time { return util.Time{Time: fakeNow.Add(item.timeToPass)} }
|
||||
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
|
||||
}
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
podEvicted := false
|
||||
|
@ -1089,13 +1089,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
|||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
|
||||
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if item.timeToPass > 0 {
|
||||
nodeController.now = func() util.Time { return util.Time{Time: fakeNow.Add(item.timeToPass)} }
|
||||
item.fakeNodeHandler.Existing[0].Status = item.newNodeStatus
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
if err := nodeController.monitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,8 +39,7 @@ var providers = make(map[string]Factory)
|
|||
func RegisterCloudProvider(name string, cloud Factory) {
|
||||
providersMutex.Lock()
|
||||
defer providersMutex.Unlock()
|
||||
_, found := providers[name]
|
||||
if found {
|
||||
if _, found := providers[name]; found {
|
||||
glog.Fatalf("Cloud provider %q was registered twice", name)
|
||||
}
|
||||
glog.V(1).Infof("Registered cloud provider %q", name)
|
||||
|
|
Loading…
Reference in New Issue