diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go index 393a205e16..b52c7c8528 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic.go @@ -286,7 +286,9 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { delete(delegates, oldSink.UID) delegates[newSink.UID] = d b.setDelegates(delegates) - oldDelegate.gracefulShutdown() + + // graceful shutdown in goroutine as to not block + go oldDelegate.gracefulShutdown() } klog.V(2).Infof("Updated audit sink: %s", newSink.Name) @@ -305,7 +307,9 @@ func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) { } delete(delegates, sink.UID) b.setDelegates(delegates) - delegate.gracefulShutdown() + + // graceful shutdown in goroutine as to not block + go delegate.gracefulShutdown() klog.V(2).Infof("Deleted audit sink: %s", sink.Name) klog.V(2).Infof("Current audit sinks: %v", delegates.Names()) } diff --git a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go index 8b9c91407a..9f49929a7b 100644 --- a/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go +++ b/staging/src/k8s.io/apiserver/plugin/pkg/audit/dynamic/dynamic_test.go @@ -95,8 +95,24 @@ func TestDynamic(t *testing.T) { }, } + badURL := "http://badtest" + badConfig := &auditregv1alpha1.AuditSink{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bad", + UID: types.UID("bad"), + }, + Spec: auditregv1alpha1.AuditSinkSpec{ + Policy: testPolicy, + Webhook: auditregv1alpha1.Webhook{ + ClientConfig: auditregv1alpha1.WebhookClientConfig{ + URL: &badURL, + }, + }, + }, + } + config, stopChan := defaultTestConfig() - config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond + config.BufferedConfig.MaxBatchSize = 1 b, err := NewBackend(config) require.NoError(t, err) @@ -108,7 +124,7 @@ func TestDynamic(t *testing.T) { require.Len(t, d.GetDelegates(), 0) }) - t.Run("find one", func(t *testing.T) { + success := t.Run("find one", func(t *testing.T) { d.addSink(testConfig1) delegates := d.GetDelegates() require.Len(t, delegates, 1) @@ -120,8 +136,31 @@ func TestDynamic(t *testing.T) { err := checkForEvent(eventList1, testEvent) require.NoError(t, err, "unable to find events sent to sink") }) + require.True(t, success) // propagate failure - t.Run("find two", func(t *testing.T) { + // test that a bad webhook configuration can be recovered from + success = t.Run("bad config", func(t *testing.T) { + d.addSink(badConfig) + delegates := d.GetDelegates() + require.Len(t, delegates, 2) + require.Contains(t, delegates, types.UID("bad")) + require.Equal(t, badConfig, delegates["bad"].configuration) + + // send events to the buffer + b.ProcessEvents(&testEvent, &testEvent) + + // event is in the buffer see if the sink can be deleted + // this will hang and fail if not handled properly + d.deleteSink(badConfig) + + delegates = d.GetDelegates() + require.Len(t, delegates, 1) + require.Contains(t, delegates, types.UID("test1")) + require.Equal(t, testConfig1, delegates["test1"].configuration) + }) + require.True(t, success) // propagate failure + + success = t.Run("find two", func(t *testing.T) { eventList1.Store(auditinternal.EventList{}) d.addSink(testConfig2) delegates := d.GetDelegates() @@ -138,8 +177,9 @@ func TestDynamic(t *testing.T) { err = checkForEvent(eventList2, testEvent) require.NoError(t, err, "unable to find events sent to sink 2") }) + require.True(t, success) // propagate failure - t.Run("delete one", func(t *testing.T) { + success = t.Run("delete one", func(t *testing.T) { eventList2.Store(auditinternal.EventList{}) d.deleteSink(testConfig1) delegates := d.GetDelegates() @@ -152,8 +192,9 @@ func TestDynamic(t *testing.T) { err := checkForEvent(eventList2, testEvent) require.NoError(t, err, "unable to find events sent to sink") }) + require.True(t, success) // propagate failure - t.Run("update one", func(t *testing.T) { + success = t.Run("update one", func(t *testing.T) { eventList1.Store(auditinternal.EventList{}) oldConfig := *testConfig2 testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL @@ -169,8 +210,9 @@ func TestDynamic(t *testing.T) { err := checkForEvent(eventList1, testEvent) require.NoError(t, err, "unable to find events sent to sink") }) + require.True(t, success) // propagate failure - t.Run("update meta only", func(t *testing.T) { + success = t.Run("update meta only", func(t *testing.T) { eventList1.Store(auditinternal.EventList{}) oldConfig := *testConfig2 testConfig2.UID = types.UID("test2.2") @@ -185,8 +227,9 @@ func TestDynamic(t *testing.T) { err := checkForEvent(eventList1, testEvent) require.NoError(t, err, "unable to find events sent to sink") }) + require.True(t, success) // propagate failure - t.Run("shutdown", func(t *testing.T) { + success = t.Run("shutdown", func(t *testing.T) { // if the stop signal is not propagated correctly the buffers will not // close down gracefully, and the shutdown method will hang causing // the test will timeout. @@ -211,6 +254,7 @@ func TestDynamic(t *testing.T) { } } }) + require.True(t, success) // propagate failure } // checkForEvent will poll to check for an audit event in an atomic event list