mirror of https://github.com/k3s-io/k3s
532 lines
15 KiB
Go
532 lines
15 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package goroutinemap
|
|
|
|
import (
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
)
|
|
|
|
const (
|
|
// testTimeout is a timeout of goroutines to finish. This _should_ be just a
|
|
// "context switch" and it should take several ms, however, Clayton says "We
|
|
// have had flakes due to tests that assumed that 15s is long enough to sleep")
|
|
testTimeout time.Duration = 1 * time.Minute
|
|
|
|
// initialOperationWaitTimeShort is the initial amount of time the test will
|
|
// wait for an operation to complete (each successive failure results in
|
|
// exponential backoff).
|
|
initialOperationWaitTimeShort time.Duration = 20 * time.Millisecond
|
|
|
|
// initialOperationWaitTimeLong is the initial amount of time the test will
|
|
// wait for an operation to complete (each successive failure results in
|
|
// exponential backoff).
|
|
initialOperationWaitTimeLong time.Duration = 500 * time.Millisecond
|
|
)
|
|
|
|
func Test_NewGoRoutineMap_Positive_SingleOp(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation := func() error { return nil }
|
|
|
|
// Act
|
|
err := grm.Run(operationName, operation)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_TwoOps(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operation1Name := "operation1-name"
|
|
operation2Name := "operation2-name"
|
|
operation := func() error { return nil }
|
|
|
|
// Act
|
|
err1 := grm.Run(operation1Name, operation)
|
|
err2 := grm.Run(operation2Name, operation)
|
|
|
|
// Assert
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation1Name, err1)
|
|
}
|
|
|
|
if err2 != nil {
|
|
t.Fatalf("NewGoRoutine %q failed. Expected: <no error> Actual: <%v>", operation2Name, err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_SingleOpWithExpBackoff(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation := func() error { return nil }
|
|
|
|
// Act
|
|
err := grm.Run(operationName, operation)
|
|
|
|
// Assert
|
|
if err != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletes(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateCallbackFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
<-operation1DoneCh // Force operation1 to complete
|
|
|
|
// Act
|
|
err2 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeShort),
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation2)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err2 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateCallbackFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
<-operation1DoneCh // Force operation1 to complete
|
|
|
|
// Act
|
|
err2 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeShort),
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation2)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err2 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanics(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1 := generatePanicFunc()
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeShort),
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation2)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err2 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_SecondOpAfterFirstPanicsWithExpBackoff(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1 := generatePanicFunc()
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeLong), // Longer duration to accommodate for backoff
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation2)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err2 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletes(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := grm.Run(operationName, operation2)
|
|
|
|
// Assert
|
|
if err2 == nil {
|
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
|
|
}
|
|
if !IsAlreadyExists(err2) {
|
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Negative_SecondOpBeforeFirstCompletesWithExpBackoff(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := grm.Run(operationName, operation2)
|
|
|
|
// Assert
|
|
if err2 == nil {
|
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
|
|
}
|
|
if !IsAlreadyExists(err2) {
|
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletes(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
operation3 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := grm.Run(operationName, operation2)
|
|
|
|
// Assert
|
|
if err2 == nil {
|
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
|
|
}
|
|
if !IsAlreadyExists(err2) {
|
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
|
}
|
|
|
|
// Act
|
|
operation1DoneCh <- true // Force operation1 to complete
|
|
err3 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeShort),
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation3)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err3 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_ThirdOpAfterFirstCompletesWithExpBackoff(t *testing.T) {
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err1 := grm.Run(operationName, operation1)
|
|
if err1 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err1)
|
|
}
|
|
operation2 := generateNoopFunc()
|
|
operation3 := generateNoopFunc()
|
|
|
|
// Act
|
|
err2 := grm.Run(operationName, operation2)
|
|
|
|
// Assert
|
|
if err2 == nil {
|
|
t.Fatalf("NewGoRoutine did not fail. Expected: <Failed to create operation with name \"%s\". An operation with that name already exists.> Actual: <no error>", operationName)
|
|
}
|
|
if !IsAlreadyExists(err2) {
|
|
t.Fatalf("NewGoRoutine did not return alreadyExistsError, got: %v", err2)
|
|
}
|
|
|
|
// Act
|
|
operation1DoneCh <- true // Force operation1 to complete
|
|
err3 := retryWithExponentialBackOff(
|
|
time.Duration(initialOperationWaitTimeShort),
|
|
func() (bool, error) {
|
|
err := grm.Run(operationName, operation3)
|
|
if err != nil {
|
|
t.Logf("Warning: NewGoRoutine failed with %v. Will retry.", err)
|
|
return false, nil
|
|
}
|
|
return true, nil
|
|
},
|
|
)
|
|
|
|
// Assert
|
|
if err3 != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err3)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_WaitEmpty(t *testing.T) {
|
|
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
|
|
// Act
|
|
waitDoneCh := make(chan interface{}, 1)
|
|
go func() {
|
|
grm.Wait()
|
|
waitDoneCh <- true
|
|
}()
|
|
|
|
// Assert
|
|
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
|
|
if err != nil {
|
|
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_WaitEmptyWithExpBackoff(t *testing.T) {
|
|
// Test than Wait() on empty GoRoutineMap always succeeds without blocking
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
|
|
// Act
|
|
waitDoneCh := make(chan interface{}, 1)
|
|
go func() {
|
|
grm.Wait()
|
|
waitDoneCh <- true
|
|
}()
|
|
|
|
// Assert
|
|
err := waitChannelWithTimeout(waitDoneCh, testTimeout)
|
|
if err != nil {
|
|
t.Errorf("Error waiting for GoRoutineMap.Wait: %v", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_Wait(t *testing.T) {
|
|
// Test that Wait() really blocks until the last operation succeeds
|
|
// Arrange
|
|
grm := NewGoRoutineMap(false /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err := grm.Run(operationName, operation1)
|
|
if err != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
waitDoneCh := make(chan interface{}, 1)
|
|
go func() {
|
|
grm.Wait()
|
|
waitDoneCh <- true
|
|
}()
|
|
|
|
// Finish the operation
|
|
operation1DoneCh <- true
|
|
|
|
// Assert
|
|
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
|
|
if err != nil {
|
|
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_Positive_WaitWithExpBackoff(t *testing.T) {
|
|
// Test that Wait() really blocks until the last operation succeeds
|
|
// Arrange
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-name"
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateWaitFunc(operation1DoneCh)
|
|
err := grm.Run(operationName, operation1)
|
|
if err != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
waitDoneCh := make(chan interface{}, 1)
|
|
go func() {
|
|
grm.Wait()
|
|
waitDoneCh <- true
|
|
}()
|
|
|
|
// Finish the operation
|
|
operation1DoneCh <- true
|
|
|
|
// Assert
|
|
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
|
|
if err != nil {
|
|
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
|
|
}
|
|
}
|
|
|
|
func Test_NewGoRoutineMap_WaitForCompletionWithExpBackoff(t *testing.T) {
|
|
grm := NewGoRoutineMap(true /* exponentialBackOffOnError */)
|
|
operationName := "operation-err"
|
|
|
|
operation1DoneCh := make(chan interface{}, 0 /* bufferSize */)
|
|
operation1 := generateErrorFunc(operation1DoneCh)
|
|
err := grm.Run(operationName, operation1)
|
|
if err != nil {
|
|
t.Fatalf("NewGoRoutine failed. Expected: <no error> Actual: <%v>", err)
|
|
}
|
|
|
|
// Act
|
|
waitDoneCh := make(chan interface{}, 1)
|
|
go func() {
|
|
grm.WaitForCompletion()
|
|
waitDoneCh <- true
|
|
}()
|
|
|
|
// Finish the operation
|
|
operation1DoneCh <- true
|
|
|
|
// Assert that WaitForCompletion returns even if scheduled op had error
|
|
err = waitChannelWithTimeout(waitDoneCh, testTimeout)
|
|
if err != nil {
|
|
t.Fatalf("Error waiting for GoRoutineMap.Wait: %v", err)
|
|
}
|
|
}
|
|
|
|
func generateCallbackFunc(done chan<- interface{}) func() error {
|
|
return func() error {
|
|
done <- true
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func generateErrorFunc(done <-chan interface{}) func() error {
|
|
return func() error {
|
|
<-done
|
|
return fmt.Errorf("Generic error")
|
|
}
|
|
}
|
|
|
|
func generateWaitFunc(done <-chan interface{}) func() error {
|
|
return func() error {
|
|
<-done
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func generatePanicFunc() func() error {
|
|
return func() error {
|
|
panic("testing panic")
|
|
}
|
|
}
|
|
|
|
func generateNoopFunc() func() error {
|
|
return func() error { return nil }
|
|
}
|
|
|
|
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
|
|
backoff := wait.Backoff{
|
|
Duration: initialDuration,
|
|
Factor: 3,
|
|
Jitter: 0,
|
|
Steps: 4,
|
|
}
|
|
return wait.ExponentialBackoff(backoff, fn)
|
|
}
|
|
|
|
func waitChannelWithTimeout(ch <-chan interface{}, timeout time.Duration) error {
|
|
timer := time.NewTimer(timeout)
|
|
defer timer.Stop()
|
|
|
|
select {
|
|
case <-ch:
|
|
// Success!
|
|
return nil
|
|
case <-timer.C:
|
|
return fmt.Errorf("timeout after %v", timeout)
|
|
}
|
|
}
|