mirror of https://github.com/k3s-io/k3s
Merge pull request #72864 from pbarker/audit-lock-fix
shutdown audit sink concurrentlypull/564/head
commit
54dc9db17b
|
@ -286,7 +286,9 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
|
|||
delete(delegates, oldSink.UID)
|
||||
delegates[newSink.UID] = d
|
||||
b.setDelegates(delegates)
|
||||
oldDelegate.gracefulShutdown()
|
||||
|
||||
// graceful shutdown in goroutine as to not block
|
||||
go oldDelegate.gracefulShutdown()
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Updated audit sink: %s", newSink.Name)
|
||||
|
@ -305,7 +307,9 @@ func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
|
|||
}
|
||||
delete(delegates, sink.UID)
|
||||
b.setDelegates(delegates)
|
||||
delegate.gracefulShutdown()
|
||||
|
||||
// graceful shutdown in goroutine as to not block
|
||||
go delegate.gracefulShutdown()
|
||||
klog.V(2).Infof("Deleted audit sink: %s", sink.Name)
|
||||
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
|
||||
}
|
||||
|
|
|
@ -95,8 +95,24 @@ func TestDynamic(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
badURL := "http://badtest"
|
||||
badConfig := &auditregv1alpha1.AuditSink{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "bad",
|
||||
UID: types.UID("bad"),
|
||||
},
|
||||
Spec: auditregv1alpha1.AuditSinkSpec{
|
||||
Policy: testPolicy,
|
||||
Webhook: auditregv1alpha1.Webhook{
|
||||
ClientConfig: auditregv1alpha1.WebhookClientConfig{
|
||||
URL: &badURL,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
config, stopChan := defaultTestConfig()
|
||||
config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond
|
||||
config.BufferedConfig.MaxBatchSize = 1
|
||||
|
||||
b, err := NewBackend(config)
|
||||
require.NoError(t, err)
|
||||
|
@ -108,7 +124,7 @@ func TestDynamic(t *testing.T) {
|
|||
require.Len(t, d.GetDelegates(), 0)
|
||||
})
|
||||
|
||||
t.Run("find one", func(t *testing.T) {
|
||||
success := t.Run("find one", func(t *testing.T) {
|
||||
d.addSink(testConfig1)
|
||||
delegates := d.GetDelegates()
|
||||
require.Len(t, delegates, 1)
|
||||
|
@ -120,8 +136,31 @@ func TestDynamic(t *testing.T) {
|
|||
err := checkForEvent(eventList1, testEvent)
|
||||
require.NoError(t, err, "unable to find events sent to sink")
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
t.Run("find two", func(t *testing.T) {
|
||||
// test that a bad webhook configuration can be recovered from
|
||||
success = t.Run("bad config", func(t *testing.T) {
|
||||
d.addSink(badConfig)
|
||||
delegates := d.GetDelegates()
|
||||
require.Len(t, delegates, 2)
|
||||
require.Contains(t, delegates, types.UID("bad"))
|
||||
require.Equal(t, badConfig, delegates["bad"].configuration)
|
||||
|
||||
// send events to the buffer
|
||||
b.ProcessEvents(&testEvent, &testEvent)
|
||||
|
||||
// event is in the buffer see if the sink can be deleted
|
||||
// this will hang and fail if not handled properly
|
||||
d.deleteSink(badConfig)
|
||||
|
||||
delegates = d.GetDelegates()
|
||||
require.Len(t, delegates, 1)
|
||||
require.Contains(t, delegates, types.UID("test1"))
|
||||
require.Equal(t, testConfig1, delegates["test1"].configuration)
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
success = t.Run("find two", func(t *testing.T) {
|
||||
eventList1.Store(auditinternal.EventList{})
|
||||
d.addSink(testConfig2)
|
||||
delegates := d.GetDelegates()
|
||||
|
@ -138,8 +177,9 @@ func TestDynamic(t *testing.T) {
|
|||
err = checkForEvent(eventList2, testEvent)
|
||||
require.NoError(t, err, "unable to find events sent to sink 2")
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
t.Run("delete one", func(t *testing.T) {
|
||||
success = t.Run("delete one", func(t *testing.T) {
|
||||
eventList2.Store(auditinternal.EventList{})
|
||||
d.deleteSink(testConfig1)
|
||||
delegates := d.GetDelegates()
|
||||
|
@ -152,8 +192,9 @@ func TestDynamic(t *testing.T) {
|
|||
err := checkForEvent(eventList2, testEvent)
|
||||
require.NoError(t, err, "unable to find events sent to sink")
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
t.Run("update one", func(t *testing.T) {
|
||||
success = t.Run("update one", func(t *testing.T) {
|
||||
eventList1.Store(auditinternal.EventList{})
|
||||
oldConfig := *testConfig2
|
||||
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
|
||||
|
@ -169,8 +210,9 @@ func TestDynamic(t *testing.T) {
|
|||
err := checkForEvent(eventList1, testEvent)
|
||||
require.NoError(t, err, "unable to find events sent to sink")
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
t.Run("update meta only", func(t *testing.T) {
|
||||
success = t.Run("update meta only", func(t *testing.T) {
|
||||
eventList1.Store(auditinternal.EventList{})
|
||||
oldConfig := *testConfig2
|
||||
testConfig2.UID = types.UID("test2.2")
|
||||
|
@ -185,8 +227,9 @@ func TestDynamic(t *testing.T) {
|
|||
err := checkForEvent(eventList1, testEvent)
|
||||
require.NoError(t, err, "unable to find events sent to sink")
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
|
||||
t.Run("shutdown", func(t *testing.T) {
|
||||
success = t.Run("shutdown", func(t *testing.T) {
|
||||
// if the stop signal is not propagated correctly the buffers will not
|
||||
// close down gracefully, and the shutdown method will hang causing
|
||||
// the test will timeout.
|
||||
|
@ -211,6 +254,7 @@ func TestDynamic(t *testing.T) {
|
|||
}
|
||||
}
|
||||
})
|
||||
require.True(t, success) // propagate failure
|
||||
}
|
||||
|
||||
// checkForEvent will poll to check for an audit event in an atomic event list
|
||||
|
|
Loading…
Reference in New Issue