fix shutdown audit sink concurrently

pull/564/head
Patrick Barker 2019-01-12 16:47:33 -07:00
parent d1e5311922
commit d81f720563
2 changed files with 57 additions and 9 deletions

View File

@ -286,7 +286,9 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
delete(delegates, oldSink.UID)
delegates[newSink.UID] = d
b.setDelegates(delegates)
oldDelegate.gracefulShutdown()
// graceful shutdown in goroutine as to not block
go oldDelegate.gracefulShutdown()
}
klog.V(2).Infof("Updated audit sink: %s", newSink.Name)
@ -305,7 +307,9 @@ func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
}
delete(delegates, sink.UID)
b.setDelegates(delegates)
delegate.gracefulShutdown()
// graceful shutdown in goroutine as to not block
go delegate.gracefulShutdown()
klog.V(2).Infof("Deleted audit sink: %s", sink.Name)
klog.V(2).Infof("Current audit sinks: %v", delegates.Names())
}

View File

@ -95,8 +95,24 @@ func TestDynamic(t *testing.T) {
},
}
badURL := "http://badtest"
badConfig := &auditregv1alpha1.AuditSink{
ObjectMeta: metav1.ObjectMeta{
Name: "bad",
UID: types.UID("bad"),
},
Spec: auditregv1alpha1.AuditSinkSpec{
Policy: testPolicy,
Webhook: auditregv1alpha1.Webhook{
ClientConfig: auditregv1alpha1.WebhookClientConfig{
URL: &badURL,
},
},
},
}
config, stopChan := defaultTestConfig()
config.BufferedConfig.MaxBatchWait = 10 * time.Millisecond
config.BufferedConfig.MaxBatchSize = 1
b, err := NewBackend(config)
require.NoError(t, err)
@ -108,7 +124,7 @@ func TestDynamic(t *testing.T) {
require.Len(t, d.GetDelegates(), 0)
})
t.Run("find one", func(t *testing.T) {
success := t.Run("find one", func(t *testing.T) {
d.addSink(testConfig1)
delegates := d.GetDelegates()
require.Len(t, delegates, 1)
@ -120,8 +136,31 @@ func TestDynamic(t *testing.T) {
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
t.Run("find two", func(t *testing.T) {
// test that a bad webhook configuration can be recovered from
success = t.Run("bad config", func(t *testing.T) {
d.addSink(badConfig)
delegates := d.GetDelegates()
require.Len(t, delegates, 2)
require.Contains(t, delegates, types.UID("bad"))
require.Equal(t, badConfig, delegates["bad"].configuration)
// send events to the buffer
b.ProcessEvents(&testEvent, &testEvent)
// event is in the buffer see if the sink can be deleted
// this will hang and fail if not handled properly
d.deleteSink(badConfig)
delegates = d.GetDelegates()
require.Len(t, delegates, 1)
require.Contains(t, delegates, types.UID("test1"))
require.Equal(t, testConfig1, delegates["test1"].configuration)
})
require.True(t, success) // propagate failure
success = t.Run("find two", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
d.addSink(testConfig2)
delegates := d.GetDelegates()
@ -138,8 +177,9 @@ func TestDynamic(t *testing.T) {
err = checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink 2")
})
require.True(t, success) // propagate failure
t.Run("delete one", func(t *testing.T) {
success = t.Run("delete one", func(t *testing.T) {
eventList2.Store(auditinternal.EventList{})
d.deleteSink(testConfig1)
delegates := d.GetDelegates()
@ -152,8 +192,9 @@ func TestDynamic(t *testing.T) {
err := checkForEvent(eventList2, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
t.Run("update one", func(t *testing.T) {
success = t.Run("update one", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.Spec.Webhook.ClientConfig.URL = &server1.URL
@ -169,8 +210,9 @@ func TestDynamic(t *testing.T) {
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
t.Run("update meta only", func(t *testing.T) {
success = t.Run("update meta only", func(t *testing.T) {
eventList1.Store(auditinternal.EventList{})
oldConfig := *testConfig2
testConfig2.UID = types.UID("test2.2")
@ -185,8 +227,9 @@ func TestDynamic(t *testing.T) {
err := checkForEvent(eventList1, testEvent)
require.NoError(t, err, "unable to find events sent to sink")
})
require.True(t, success) // propagate failure
t.Run("shutdown", func(t *testing.T) {
success = t.Run("shutdown", func(t *testing.T) {
// if the stop signal is not propagated correctly the buffers will not
// close down gracefully, and the shutdown method will hang causing
// the test will timeout.
@ -211,6 +254,7 @@ func TestDynamic(t *testing.T) {
}
}
})
require.True(t, success) // propagate failure
}
// checkForEvent will poll to check for an audit event in an atomic event list