Merge pull request #56207 from wojtek-t/fix_long_even_handlers

Automatic merge from submit-queue (batch tested with PRs 56207, 55950). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix long event handler in cloud cidr allocator

Ref #52292
pull/6/head
Kubernetes Submit Queue 2017-11-22 05:21:23 -08:00 committed by GitHub
commit c3f8d33534
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 29 additions and 30 deletions

View File

@ -49,10 +49,12 @@ type cloudCIDRAllocator struct {
client clientset.Interface client clientset.Interface
cloud *gce.GCECloud cloud *gce.GCECloud
// Channel that is used to pass updating Nodes with assigned CIDRs to the background // Channel that is used to pass updating Nodes to the background.
// This increases a throughput of CIDR assignment by not blocking on long operations. // This increases the throughput of CIDR assignment by parallelization
nodeCIDRUpdateChannel chan nodeAndCIDR // and not blocking on long operations (which shouldn't be done from
recorder record.EventRecorder // event handlers anyway).
nodeUpdateChannel chan string
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation // Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
lock sync.Mutex lock sync.Mutex
@ -80,11 +82,11 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
} }
ca := &cloudCIDRAllocator{ ca := &cloudCIDRAllocator{
client: client, client: client,
cloud: gceCloud, cloud: gceCloud,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize), nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
recorder: recorder, recorder: recorder,
nodesInProcessing: sets.NewString(), nodesInProcessing: sets.NewString(),
} }
for i := 0; i < cidrUpdateWorkers; i++ { for i := 0; i < cidrUpdateWorkers; i++ {
@ -99,7 +101,7 @@ func NewCloudCIDRAllocator(client clientset.Interface, cloud cloudprovider.Inter
func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) { func (ca *cloudCIDRAllocator) worker(stopChan <-chan struct{}) {
for { for {
select { select {
case workItem, ok := <-ca.nodeCIDRUpdateChannel: case workItem, ok := <-ca.nodeUpdateChannel:
if !ok { if !ok {
glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed") glog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return return
@ -138,14 +140,24 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name) glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil return nil
} }
cidrs, err := ca.cloud.AliasRanges(types.NodeName(node.Name))
glog.V(4).Infof("Putting node %s into the work queue", node.Name)
ca.nodeUpdateChannel <- node.Name
return nil
}
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(nodeName string) error {
var err error
var node *v1.Node
defer ca.removeNodeFromProcessing(nodeName)
cidrs, err := ca.cloud.AliasRanges(types.NodeName(nodeName))
if err != nil { if err != nil {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err) return fmt.Errorf("failed to allocate cidr: %v", err)
} }
if len(cidrs) == 0 { if len(cidrs) == 0 {
ca.removeNodeFromProcessing(node.Name)
util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable") util.RecordNodeStatusChange(ca.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name) return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
} }
@ -153,26 +165,13 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if err != nil { if err != nil {
return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err) return fmt.Errorf("failed to parse string '%s' as a CIDR: %v", cidrs[0], err)
} }
podCIDR := cidr.String()
glog.V(4).Infof("Putting node %s with CIDR %s into the work queue", node.Name, cidrs[0])
ca.nodeCIDRUpdateChannel <- nodeAndCIDR{
nodeName: node.Name,
cidr: cidr,
}
return nil
}
// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *v1.Node
defer ca.removeNodeFromProcessing(data.nodeName)
podCIDR := data.cidr.String()
for rep := 0; rep < cidrUpdateRetries; rep++ { for rep := 0; rep < cidrUpdateRetries; rep++ {
// TODO: change it to using PATCH instead of full Node updates. // TODO: change it to using PATCH instead of full Node updates.
node, err = ca.client.CoreV1().Nodes().Get(data.nodeName, metav1.GetOptions{}) node, err = ca.client.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil { if err != nil {
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err) glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", nodeName, err)
continue continue
} }
if node.Spec.PodCIDR != "" { if node.Spec.PodCIDR != "" {
@ -197,7 +196,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
} }
if err != nil { if err != nil {
util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed") util.RecordNodeStatusChange(ca.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v.", data.nodeName, err) glog.Errorf("CIDR assignment for node %v failed: %v.", nodeName, err)
return err return err
} }