fix(snapshots): fix a data race in the snapshot code EE-2717 (#6654)

pull/6596/merge
andres-portainer 3 years ago committed by GitHub
parent 78150a738f
commit 226ffdcd20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,7 +15,7 @@ import (
// specific Docker/Kubernetes environment(endpoint) snapshot methods. // specific Docker/Kubernetes environment(endpoint) snapshot methods.
type Service struct { type Service struct {
dataStore dataservices.DataStore dataStore dataservices.DataStore
refreshSignal chan struct{} snapshotIntervalCh chan time.Duration
snapshotIntervalInSeconds float64 snapshotIntervalInSeconds float64
dockerSnapshotter portainer.DockerSnapshotter dockerSnapshotter portainer.DockerSnapshotter
kubernetesSnapshotter portainer.KubernetesSnapshotter kubernetesSnapshotter portainer.KubernetesSnapshotter
@ -24,14 +24,15 @@ type Service struct {
// NewService creates a new instance of a service // NewService creates a new instance of a service
func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStore, dockerSnapshotter portainer.DockerSnapshotter, kubernetesSnapshotter portainer.KubernetesSnapshotter, shutdownCtx context.Context) (*Service, error) { func NewService(snapshotIntervalFromFlag string, dataStore dataservices.DataStore, dockerSnapshotter portainer.DockerSnapshotter, kubernetesSnapshotter portainer.KubernetesSnapshotter, shutdownCtx context.Context) (*Service, error) {
snapshotFrequency, err := parseSnapshotFrequency(snapshotIntervalFromFlag, dataStore) interval, err := parseSnapshotFrequency(snapshotIntervalFromFlag, dataStore)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Service{ return &Service{
dataStore: dataStore, dataStore: dataStore,
snapshotIntervalInSeconds: snapshotFrequency, snapshotIntervalCh: make(chan time.Duration),
snapshotIntervalInSeconds: interval,
dockerSnapshotter: dockerSnapshotter, dockerSnapshotter: dockerSnapshotter,
kubernetesSnapshotter: kubernetesSnapshotter, kubernetesSnapshotter: kubernetesSnapshotter,
shutdownCtx: shutdownCtx, shutdownCtx: shutdownCtx,
@ -58,35 +59,17 @@ func parseSnapshotFrequency(snapshotInterval string, dataStore dataservices.Data
// Start will start a background routine to execute periodic snapshots of environments(endpoints) // Start will start a background routine to execute periodic snapshots of environments(endpoints)
func (service *Service) Start() { func (service *Service) Start() {
if service.refreshSignal != nil { go service.startSnapshotLoop()
return
}
service.refreshSignal = make(chan struct{})
service.startSnapshotLoop()
}
func (service *Service) Stop() {
if service.refreshSignal == nil {
return
}
// clear refreshSignal to mark the service as disabled
close(service.refreshSignal)
service.refreshSignal = nil
} }
// SetSnapshotInterval sets the snapshot interval and resets the service // SetSnapshotInterval sets the snapshot interval and resets the service
func (service *Service) SetSnapshotInterval(snapshotInterval string) error { func (service *Service) SetSnapshotInterval(snapshotInterval string) error {
service.Stop() interval, err := time.ParseDuration(snapshotInterval)
snapshotFrequency, err := time.ParseDuration(snapshotInterval)
if err != nil { if err != nil {
return err return err
} }
service.snapshotIntervalInSeconds = snapshotFrequency.Seconds()
service.Start() service.snapshotIntervalCh <- interval
return nil return nil
} }
@ -140,34 +123,29 @@ func (service *Service) snapshotDockerEndpoint(endpoint *portainer.Endpoint) err
return nil return nil
} }
func (service *Service) startSnapshotLoop() error { func (service *Service) startSnapshotLoop() {
ticker := time.NewTicker(time.Duration(service.snapshotIntervalInSeconds) * time.Second) ticker := time.NewTicker(time.Duration(service.snapshotIntervalInSeconds) * time.Second)
go func() {
err := service.snapshotEndpoints()
if err != nil {
log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (environment snapshot).] [error: %s]", err)
}
for { err := service.snapshotEndpoints()
select { if err != nil {
case <-ticker.C: log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (environment snapshot).] [error: %s]", err)
err := service.snapshotEndpoints() }
if err != nil {
log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (environment snapshot).] [error: %s]", err) for {
} select {
case <-service.shutdownCtx.Done(): case <-ticker.C:
log.Println("[DEBUG] [internal,snapshot] [message: shutting down snapshotting]") err := service.snapshotEndpoints()
ticker.Stop() if err != nil {
return log.Printf("[ERROR] [internal,snapshot] [message: background schedule error (environment snapshot).] [error: %s]", err)
case <-service.refreshSignal:
log.Println("[DEBUG] [internal,snapshot] [message: shutting down snapshotting]")
ticker.Stop()
return
} }
case <-service.shutdownCtx.Done():
log.Println("[DEBUG] [internal,snapshot] [message: shutting down snapshotting]")
ticker.Stop()
return
case interval := <-service.snapshotIntervalCh:
ticker.Reset(interval)
} }
}() }
return nil
} }
func (service *Service) snapshotEndpoints() error { func (service *Service) snapshotEndpoints() error {

@ -1322,7 +1322,6 @@ type (
// SnapshotService represents a service for managing environment(endpoint) snapshots // SnapshotService represents a service for managing environment(endpoint) snapshots
SnapshotService interface { SnapshotService interface {
Start() Start()
Stop()
SetSnapshotInterval(snapshotInterval string) error SetSnapshotInterval(snapshotInterval string) error
SnapshotEndpoint(endpoint *Endpoint) error SnapshotEndpoint(endpoint *Endpoint) error
} }

Loading…
Cancel
Save