mirror of https://github.com/k3s-io/k3s
Merge pull request #56638 from crassirostris/audit-webhook-make-configurable
Automatic merge from submit-queue (batch tested with PRs 56790, 56638). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Make audit batch webhook backend configurable This PR adds an ability to configure key parameters for the most important audit backend at-scale, so that if the default parameters don't fit and audit events are lost/delayed, it's possible to adjust these parameters to fix the problem. In the future those parameters will stay, but will be used to populate the values for the generic buffering backend, both for webhook and log backends. /cc @kubernetes/sig-auth-pr-reviews @sttts @tallclair @ericchiang ```release-note Audit webhook batching parameters are now configurable via command-line flags in the apiserver. ``` ref #54551pull/6/head
commit
31375e30ba
|
@ -74,6 +74,7 @@ go_test(
|
|||
"//vendor/k8s.io/apiserver/pkg/server/options:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/plugin/pkg/audit/webhook:go_default_library",
|
||||
"//vendor/k8s.io/client-go/rest:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
utilconfig "k8s.io/apiserver/pkg/util/flag"
|
||||
auditwebhook "k8s.io/apiserver/plugin/pkg/audit/webhook"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
kapi "k8s.io/kubernetes/pkg/apis/core"
|
||||
|
@ -55,6 +56,12 @@ func TestAddFlags(t *testing.T) {
|
|||
"--audit-policy-file=/policy",
|
||||
"--audit-webhook-config-file=/webhook-config",
|
||||
"--audit-webhook-mode=blocking",
|
||||
"--audit-webhook-batch-buffer-size=42",
|
||||
"--audit-webhook-batch-max-size=43",
|
||||
"--audit-webhook-batch-max-wait=1s",
|
||||
"--audit-webhook-batch-throttle-qps=43.5",
|
||||
"--audit-webhook-batch-throttle-burst=44",
|
||||
"--audit-webhook-batch-initial-backoff=2s",
|
||||
"--authentication-token-webhook-cache-ttl=3m",
|
||||
"--authentication-token-webhook-config-file=/token-webhook-config",
|
||||
"--authorization-mode=AlwaysDeny",
|
||||
|
@ -170,6 +177,14 @@ func TestAddFlags(t *testing.T) {
|
|||
WebhookOptions: apiserveroptions.AuditWebhookOptions{
|
||||
Mode: "blocking",
|
||||
ConfigFile: "/webhook-config",
|
||||
BatchConfig: auditwebhook.BatchBackendConfig{
|
||||
BufferSize: 42,
|
||||
MaxBatchSize: 43,
|
||||
MaxBatchWait: 1 * time.Second,
|
||||
ThrottleQPS: 43.5,
|
||||
ThrottleBurst: 44,
|
||||
InitialBackoff: 2 * time.Second,
|
||||
},
|
||||
},
|
||||
PolicyFile: "/policy",
|
||||
},
|
||||
|
|
|
@ -77,12 +77,17 @@ type AuditWebhookOptions struct {
|
|||
//
|
||||
// Defaults to asynchronous batch events.
|
||||
Mode string
|
||||
// Configuration for batching webhook. Only used in batch mode.
|
||||
BatchConfig pluginwebhook.BatchBackendConfig
|
||||
}
|
||||
|
||||
func NewAuditOptions() *AuditOptions {
|
||||
return &AuditOptions{
|
||||
WebhookOptions: AuditWebhookOptions{Mode: pluginwebhook.ModeBatch},
|
||||
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
|
||||
WebhookOptions: AuditWebhookOptions{
|
||||
Mode: pluginwebhook.ModeBatch,
|
||||
BatchConfig: pluginwebhook.NewDefaultBatchBackendConfig(),
|
||||
},
|
||||
LogOptions: AuditLogOptions{Format: pluginlog.FormatJson},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,7 +107,7 @@ func (o *AuditOptions) Validate() []error {
|
|||
allErrors = append(allErrors, fmt.Errorf("feature '%s' must be enabled to set option --audit-webhook-config-file", features.AdvancedAuditing))
|
||||
}
|
||||
} else {
|
||||
// check webhook mode
|
||||
// Check webhook mode
|
||||
validMode := false
|
||||
for _, m := range pluginwebhook.AllowedModes {
|
||||
if m == o.WebhookOptions.Mode {
|
||||
|
@ -114,7 +119,21 @@ func (o *AuditOptions) Validate() []error {
|
|||
allErrors = append(allErrors, fmt.Errorf("invalid audit webhook mode %s, allowed modes are %q", o.WebhookOptions.Mode, strings.Join(pluginwebhook.AllowedModes, ",")))
|
||||
}
|
||||
|
||||
// check log format
|
||||
// Check webhook batch configuration
|
||||
if o.WebhookOptions.BatchConfig.BufferSize <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook buffer size %v, must be a positive number", o.WebhookOptions.BatchConfig.BufferSize))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.MaxBatchSize <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook max batch size %v, must be a positive number", o.WebhookOptions.BatchConfig.MaxBatchSize))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.ThrottleQPS <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle QPS %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleQPS))
|
||||
}
|
||||
if o.WebhookOptions.BatchConfig.ThrottleBurst <= 0 {
|
||||
allErrors = append(allErrors, fmt.Errorf("invalid audit batch webhook throttle burst %v, must be a positive number", o.WebhookOptions.BatchConfig.ThrottleBurst))
|
||||
}
|
||||
|
||||
// Check log format
|
||||
validFormat := false
|
||||
for _, f := range pluginlog.AllowedFormats {
|
||||
if f == o.LogOptions.Format {
|
||||
|
@ -249,6 +268,24 @@ func (o *AuditWebhookOptions) AddFlags(fs *pflag.FlagSet) {
|
|||
"Strategy for sending audit events. Blocking indicates sending events should block"+
|
||||
" server responses. Batch causes the webhook to buffer and send events"+
|
||||
" asynchronously. Known modes are "+strings.Join(pluginwebhook.AllowedModes, ",")+".")
|
||||
fs.IntVar(&o.BatchConfig.BufferSize, "audit-webhook-batch-buffer-size",
|
||||
o.BatchConfig.BufferSize, "The size of the buffer to store events before "+
|
||||
"batching and sending to the webhook. Only used in batch mode.")
|
||||
fs.IntVar(&o.BatchConfig.MaxBatchSize, "audit-webhook-batch-max-size",
|
||||
o.BatchConfig.MaxBatchSize, "The maximum size of a batch sent to the webhook. "+
|
||||
"Only used in batch mode.")
|
||||
fs.DurationVar(&o.BatchConfig.MaxBatchWait, "audit-webhook-batch-max-wait",
|
||||
o.BatchConfig.MaxBatchWait, "The amount of time to wait before force sending the "+
|
||||
"batch that hadn't reached the max size. Only used in batch mode.")
|
||||
fs.Float32Var(&o.BatchConfig.ThrottleQPS, "audit-webhook-batch-throttle-qps",
|
||||
o.BatchConfig.ThrottleQPS, "Maximum average number of requests per second. "+
|
||||
"Only used in batch mode.")
|
||||
fs.IntVar(&o.BatchConfig.ThrottleBurst, "audit-webhook-batch-throttle-burst",
|
||||
o.BatchConfig.ThrottleBurst, "Maximum number of requests sent at the same "+
|
||||
"moment if ThrottleQPS was not utilized before. Only used in batch mode.")
|
||||
fs.DurationVar(&o.BatchConfig.InitialBackoff, "audit-webhook-batch-initial-backoff",
|
||||
o.BatchConfig.InitialBackoff, "The amount of time to wait before retrying the "+
|
||||
"first failed requests. Only used in batch mode.")
|
||||
}
|
||||
|
||||
func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
||||
|
@ -256,7 +293,7 @@ func (o *AuditWebhookOptions) applyTo(c *server.Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion)
|
||||
webhook, err := pluginwebhook.NewBackend(o.ConfigFile, o.Mode, auditv1beta1.SchemeGroupVersion, o.BatchConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("initializing audit webhook: %v", err)
|
||||
}
|
||||
|
|
|
@ -57,9 +57,6 @@ var AllowedModes = []string{
|
|||
|
||||
const (
|
||||
// Default configuration values for ModeBatch.
|
||||
//
|
||||
// TODO(ericchiang): Make these value configurable. Maybe through a
|
||||
// kubeconfig extension?
|
||||
defaultBatchBufferSize = 10000 // Buffer up to 10000 events before starting discarding.
|
||||
defaultBatchMaxSize = 400 // Only send up to 400 events at a time.
|
||||
defaultBatchMaxWait = 30 * time.Second // Send events at least twice a minute.
|
||||
|
@ -72,13 +69,49 @@ const (
|
|||
// The plugin name reported in error metrics.
|
||||
const pluginName = "webhook"
|
||||
|
||||
// BatchBackendConfig represents batching webhook audit backend configuration.
|
||||
type BatchBackendConfig struct {
|
||||
// BufferSize defines a size of the buffering queue.
|
||||
BufferSize int
|
||||
// MaxBatchSize defines maximum size of a batch.
|
||||
MaxBatchSize int
|
||||
// MaxBatchWait defines maximum amount of time to wait for MaxBatchSize
|
||||
// events to be accumulated in the buffer before forcibly sending what's
|
||||
// being accumulated.
|
||||
MaxBatchWait time.Duration
|
||||
|
||||
// ThrottleQPS defines the allowed rate of batches per second sent to the webhook.
|
||||
ThrottleQPS float32
|
||||
// ThrottleBurst defines the maximum rate of batches per second sent to the webhook in case
|
||||
// the capacity defined by ThrottleQPS was not utilized.
|
||||
ThrottleBurst int
|
||||
|
||||
// InitialBackoff defines the amount of time to wait before retrying the requests
|
||||
// to the webhook for the first time.
|
||||
InitialBackoff time.Duration
|
||||
}
|
||||
|
||||
// NewDefaultBatchBackendConfig returns new BatchBackendConfig objects populated by default values.
|
||||
func NewDefaultBatchBackendConfig() BatchBackendConfig {
|
||||
return BatchBackendConfig{
|
||||
BufferSize: defaultBatchBufferSize,
|
||||
MaxBatchSize: defaultBatchMaxSize,
|
||||
MaxBatchWait: defaultBatchMaxWait,
|
||||
|
||||
ThrottleQPS: defaultBatchThrottleQPS,
|
||||
ThrottleBurst: defaultBatchThrottleBurst,
|
||||
|
||||
InitialBackoff: defaultInitialBackoff,
|
||||
}
|
||||
}
|
||||
|
||||
// NewBackend returns an audit backend that sends events over HTTP to an external service.
|
||||
// The mode indicates the caching behavior of the webhook. Either blocking (ModeBlocking)
|
||||
// or buffered with batch POSTs (ModeBatch).
|
||||
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion) (audit.Backend, error) {
|
||||
func NewBackend(kubeConfigFile string, mode string, groupVersion schema.GroupVersion, config BatchBackendConfig) (audit.Backend, error) {
|
||||
switch mode {
|
||||
case ModeBatch:
|
||||
return newBatchWebhook(kubeConfigFile, groupVersion)
|
||||
return newBatchWebhook(kubeConfigFile, groupVersion, config)
|
||||
case ModeBlocking:
|
||||
return newBlockingWebhook(kubeConfigFile, groupVersion)
|
||||
default:
|
||||
|
@ -105,13 +138,13 @@ func init() {
|
|||
install.Install(groupFactoryRegistry, registry, audit.Scheme)
|
||||
}
|
||||
|
||||
func loadWebhook(configFile string, groupVersion schema.GroupVersion) (*webhook.GenericWebhook, error) {
|
||||
func loadWebhook(configFile string, groupVersion schema.GroupVersion, initialBackoff time.Duration) (*webhook.GenericWebhook, error) {
|
||||
return webhook.NewGenericWebhook(registry, audit.Codecs, configFile,
|
||||
[]schema.GroupVersion{groupVersion}, defaultInitialBackoff)
|
||||
[]schema.GroupVersion{groupVersion}, initialBackoff)
|
||||
}
|
||||
|
||||
func newBlockingWebhook(configFile string, groupVersion schema.GroupVersion) (*blockingBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion)
|
||||
w, err := loadWebhook(configFile, groupVersion, defaultInitialBackoff)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -146,19 +179,19 @@ func (b *blockingBackend) processEvents(ev ...*auditinternal.Event) error {
|
|||
return b.w.RestClient.Post().Body(&list).Do().Error()
|
||||
}
|
||||
|
||||
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion) (*batchBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion)
|
||||
func newBatchWebhook(configFile string, groupVersion schema.GroupVersion, config BatchBackendConfig) (*batchBackend, error) {
|
||||
w, err := loadWebhook(configFile, groupVersion, config.InitialBackoff)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &batchBackend{
|
||||
w: w,
|
||||
buffer: make(chan *auditinternal.Event, defaultBatchBufferSize),
|
||||
maxBatchSize: defaultBatchMaxSize,
|
||||
maxBatchWait: defaultBatchMaxWait,
|
||||
buffer: make(chan *auditinternal.Event, config.BufferSize),
|
||||
maxBatchSize: config.MaxBatchSize,
|
||||
maxBatchWait: config.MaxBatchWait,
|
||||
shutdownCh: make(chan struct{}),
|
||||
throttle: flowcontrol.NewTokenBucketRateLimiter(defaultBatchThrottleQPS, defaultBatchThrottleBurst),
|
||||
throttle: flowcontrol.NewTokenBucketRateLimiter(config.ThrottleQPS, config.ThrottleBurst),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -116,7 +116,7 @@ func newWebhook(t *testing.T, endpoint string, mode string, groupVersion schema.
|
|||
// NOTE(ericchiang): Do we need to use a proper serializer?
|
||||
require.NoError(t, stdjson.NewEncoder(f).Encode(config), "writing kubeconfig")
|
||||
|
||||
backend, err := NewBackend(f.Name(), mode, groupVersion)
|
||||
backend, err := NewBackend(f.Name(), mode, groupVersion, NewDefaultBatchBackendConfig())
|
||||
require.NoError(t, err, "initializing backend")
|
||||
|
||||
return backend
|
||||
|
|
Loading…
Reference in New Issue