mirror of https://github.com/portainer/portainer
fix(docker): fix a data race in the Docker transport BE-10873 (#255)
parent
eb2a754580
commit
ad77cd195c
|
@ -11,6 +11,7 @@ import (
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
portainer "github.com/portainer/portainer/api"
|
portainer "github.com/portainer/portainer/api"
|
||||||
"github.com/portainer/portainer/api/dataservices"
|
"github.com/portainer/portainer/api/dataservices"
|
||||||
|
@ -37,6 +38,8 @@ type (
|
||||||
dockerClientFactory *dockerclient.ClientFactory
|
dockerClientFactory *dockerclient.ClientFactory
|
||||||
gitService portainer.GitService
|
gitService portainer.GitService
|
||||||
snapshotService portainer.SnapshotService
|
snapshotService portainer.SnapshotService
|
||||||
|
dockerID string
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// TransportParameters is used to create a new Transport
|
// TransportParameters is used to create a new Transport
|
||||||
|
@ -679,9 +682,7 @@ func (transport *Transport) executeGenericResourceDeletionOperation(request *htt
|
||||||
}
|
}
|
||||||
|
|
||||||
if resourceControl != nil {
|
if resourceControl != nil {
|
||||||
if err := transport.dataStore.ResourceControl().Delete(resourceControl.ID); err != nil {
|
err = transport.dataStore.ResourceControl().Delete(resourceControl.ID)
|
||||||
return response, err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return response, err
|
return response, err
|
||||||
|
|
|
@ -14,7 +14,6 @@ import (
|
||||||
"github.com/portainer/portainer/api/internal/snapshot"
|
"github.com/portainer/portainer/api/internal/snapshot"
|
||||||
|
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const volumeObjectIdentifier = "ResourceID"
|
const volumeObjectIdentifier = "ResourceID"
|
||||||
|
@ -50,15 +49,6 @@ func (transport *Transport) volumeListOperation(response *http.Response, executo
|
||||||
|
|
||||||
volumeData := responseObject["Volumes"].([]any)
|
volumeData := responseObject["Volumes"].([]any)
|
||||||
|
|
||||||
if transport.snapshotService != nil {
|
|
||||||
// Filling snapshot data can improve the performance of getVolumeResourceID
|
|
||||||
if err = transport.snapshotService.FillSnapshotData(transport.endpoint); err != nil {
|
|
||||||
log.Info().Err(err).
|
|
||||||
Int("endpoint id", int(transport.endpoint.ID)).
|
|
||||||
Msg("snapshot is not filled into the endpoint.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, volumeObject := range volumeData {
|
for _, volumeObject := range volumeData {
|
||||||
volume := volumeObject.(map[string]any)
|
volume := volumeObject.(map[string]any)
|
||||||
|
|
||||||
|
@ -147,7 +137,7 @@ func (transport *Transport) decorateVolumeResourceCreationOperation(request *htt
|
||||||
}
|
}
|
||||||
defer cli.Close()
|
defer cli.Close()
|
||||||
|
|
||||||
if _, err = cli.VolumeInspect(context.Background(), volumeID); err == nil {
|
if _, err := cli.VolumeInspect(context.Background(), volumeID); err == nil {
|
||||||
return &http.Response{
|
return &http.Response{
|
||||||
StatusCode: http.StatusConflict,
|
StatusCode: http.StatusConflict,
|
||||||
}, errors.New("a volume with the same name already exists")
|
}, errors.New("a volume with the same name already exists")
|
||||||
|
@ -222,14 +212,27 @@ func (transport *Transport) getVolumeResourceID(volumeName string) (string, erro
|
||||||
}
|
}
|
||||||
|
|
||||||
func (transport *Transport) getDockerID() (string, error) {
|
func (transport *Transport) getDockerID() (string, error) {
|
||||||
if len(transport.endpoint.Snapshots) > 0 {
|
transport.mu.Lock()
|
||||||
dockerID, err := snapshot.FetchDockerID(transport.endpoint.Snapshots[0])
|
defer transport.mu.Unlock()
|
||||||
// ignore err - in case of error, just generate not from snapshot
|
|
||||||
if err == nil {
|
// Local cache
|
||||||
return dockerID, nil
|
if transport.dockerID != "" {
|
||||||
|
return transport.dockerID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot cache
|
||||||
|
if transport.snapshotService != nil {
|
||||||
|
endpoint := portainer.Endpoint{ID: transport.endpoint.ID}
|
||||||
|
|
||||||
|
if err := transport.snapshotService.FillSnapshotData(&endpoint); err == nil {
|
||||||
|
if dockerID, err := snapshot.FetchDockerID(endpoint.Snapshots[0]); err == nil {
|
||||||
|
transport.dockerID = dockerID
|
||||||
|
return dockerID, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remote value
|
||||||
client, err := transport.dockerClientFactory.CreateClient(transport.endpoint, "", nil)
|
client, err := transport.dockerClientFactory.CreateClient(transport.endpoint, "", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -242,8 +245,11 @@ func (transport *Transport) getDockerID() (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if info.Swarm.Cluster != nil {
|
if info.Swarm.Cluster != nil {
|
||||||
return info.Swarm.Cluster.ID, nil
|
transport.dockerID = info.Swarm.Cluster.ID
|
||||||
|
return transport.dockerID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return info.ID, nil
|
transport.dockerID = info.ID
|
||||||
|
|
||||||
|
return transport.dockerID, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ import (
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Service repesents a service to manage environment(endpoint) snapshots.
|
// Service represents a service to manage environment(endpoint) snapshots.
|
||||||
// It provides an interface to start background snapshots as well as
|
// It provides an interface to start background snapshots as well as
|
||||||
// specific Docker/Kubernetes environment(endpoint) snapshot methods.
|
// specific Docker/Kubernetes environment(endpoint) snapshot methods.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
|
@ -174,30 +174,6 @@ func (service *Service) FillSnapshotData(endpoint *portainer.Endpoint) error {
|
||||||
return FillSnapshotData(service.dataStore, endpoint)
|
return FillSnapshotData(service.dataStore, endpoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
func FillSnapshotData(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint) error {
|
|
||||||
snapshot, err := tx.Snapshot().Read(endpoint.ID)
|
|
||||||
if tx.IsErrObjectNotFound(err) {
|
|
||||||
endpoint.Snapshots = []portainer.DockerSnapshot{}
|
|
||||||
endpoint.Kubernetes.Snapshots = []portainer.KubernetesSnapshot{}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if snapshot.Docker != nil {
|
|
||||||
endpoint.Snapshots = []portainer.DockerSnapshot{*snapshot.Docker}
|
|
||||||
}
|
|
||||||
|
|
||||||
if snapshot.Kubernetes != nil {
|
|
||||||
endpoint.Kubernetes.Snapshots = []portainer.KubernetesSnapshot{*snapshot.Kubernetes}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (service *Service) snapshotKubernetesEndpoint(endpoint *portainer.Endpoint) error {
|
func (service *Service) snapshotKubernetesEndpoint(endpoint *portainer.Endpoint) error {
|
||||||
kubernetesSnapshot, err := service.kubernetesSnapshotter.CreateSnapshot(endpoint)
|
kubernetesSnapshot, err := service.kubernetesSnapshotter.CreateSnapshot(endpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -285,11 +261,16 @@ func (service *Service) snapshotEndpoints() error {
|
||||||
|
|
||||||
snapshotError := service.SnapshotEndpoint(&endpoint)
|
snapshotError := service.SnapshotEndpoint(&endpoint)
|
||||||
|
|
||||||
service.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
if err := service.dataStore.UpdateTx(func(tx dataservices.DataStoreTx) error {
|
||||||
updateEndpointStatus(tx, &endpoint, snapshotError, service.pendingActionsService)
|
updateEndpointStatus(tx, &endpoint, snapshotError, service.pendingActionsService)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
}); err != nil {
|
||||||
|
log.Error().
|
||||||
|
Err(err).
|
||||||
|
Int("endpoint_id", int(endpoint.ID)).
|
||||||
|
Msg("unable to update environment status")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -340,12 +321,31 @@ func FetchDockerID(snapshot portainer.DockerSnapshot) (string, error) {
|
||||||
return info.ID, nil
|
return info.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
swarmInfo := info.Swarm
|
if info.Swarm.Cluster == nil {
|
||||||
if swarmInfo.Cluster == nil {
|
|
||||||
return "", errors.New("swarm environment is missing cluster info snapshot")
|
return "", errors.New("swarm environment is missing cluster info snapshot")
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterInfo := swarmInfo.Cluster
|
return info.Swarm.Cluster.ID, nil
|
||||||
|
}
|
||||||
return clusterInfo.ID, nil
|
|
||||||
|
func FillSnapshotData(tx dataservices.DataStoreTx, endpoint *portainer.Endpoint) error {
|
||||||
|
snapshot, err := tx.Snapshot().Read(endpoint.ID)
|
||||||
|
if tx.IsErrObjectNotFound(err) {
|
||||||
|
endpoint.Snapshots = []portainer.DockerSnapshot{}
|
||||||
|
endpoint.Kubernetes.Snapshots = []portainer.KubernetesSnapshot{}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
} else if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshot.Docker != nil {
|
||||||
|
endpoint.Snapshots = []portainer.DockerSnapshot{*snapshot.Docker}
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshot.Kubernetes != nil {
|
||||||
|
endpoint.Kubernetes.Snapshots = []portainer.KubernetesSnapshot{*snapshot.Kubernetes}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue