mirror of https://github.com/portainer/portainer
fix(offlinegate): fix data race in offlinegate EE-2713 (#6626)
parent
a66e863646
commit
1ab65a4b4f
|
@ -37,6 +37,7 @@ require (
|
||||||
github.com/robfig/cron/v3 v3.0.1
|
github.com/robfig/cron/v3 v3.0.1
|
||||||
github.com/sirupsen/logrus v1.8.1
|
github.com/sirupsen/logrus v1.8.1
|
||||||
github.com/stretchr/testify v1.7.0
|
github.com/stretchr/testify v1.7.0
|
||||||
|
github.com/viney-shih/go-lock v1.1.1
|
||||||
go.etcd.io/bbolt v1.3.6
|
go.etcd.io/bbolt v1.3.6
|
||||||
golang.org/x/crypto v0.0.0-20220307211146-efcb8507fb70
|
golang.org/x/crypto v0.0.0-20220307211146-efcb8507fb70
|
||||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
|
||||||
|
|
|
@ -718,6 +718,8 @@ github.com/urfave/cli v0.0.0-20171014202726-7bc6a0acffa5/go.mod h1:70zkFmudgCuE/
|
||||||
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
|
||||||
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||||
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
|
||||||
|
github.com/viney-shih/go-lock v1.1.1 h1:SwzDPPAiHpcwGCr5k8xD15d2gQSo8d4roRYd7TDV2eI=
|
||||||
|
github.com/viney-shih/go-lock v1.1.1/go.mod h1:Yijm78Ljteb3kRiJrbLAxVntkUukGu5uzSxq/xV7OO8=
|
||||||
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
|
github.com/vishvananda/netlink v0.0.0-20181108222139-023a6dafdcdf/go.mod h1:+SR5DhBJrl6ZM7CoCKvpw5BKroDKQ+PJqOg65H/2ktk=
|
||||||
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
|
github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE=
|
||||||
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho=
|
github.com/vishvananda/netlink v1.1.1-0.20201029203352-d40f9887b852/go.mod h1:twkDnbuQxJYemMlGd4JFIcuhgX83tXhKS2B/PRMpOho=
|
||||||
|
@ -860,6 +862,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
|
||||||
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||||
|
|
|
@ -3,69 +3,48 @@ package offlinegate
|
||||||
import (
|
import (
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sync"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
httperror "github.com/portainer/libhttp/error"
|
httperror "github.com/portainer/libhttp/error"
|
||||||
|
lock "github.com/viney-shih/go-lock"
|
||||||
)
|
)
|
||||||
|
|
||||||
// OfflineGate is a entity that works similar to a mutex with a signaling
|
// OfflineGate is an entity that works similar to a mutex with signaling
|
||||||
// Only the caller that have Locked an gate can unlock it, otherw will be blocked with a call to Lock.
|
// Only the caller that has Locked a gate can unlock it, otherwise it will be blocked with a call to Lock.
|
||||||
// Gate provides a passthrough http middleware that will wait for a locked gate to be unlocked.
|
// Gate provides a passthrough http middleware that will wait for a locked gate to be unlocked.
|
||||||
// For a safety reasons, middleware will timeout
|
// For safety reasons, the middleware will timeout
|
||||||
type OfflineGate struct {
|
type OfflineGate struct {
|
||||||
lock *sync.Mutex
|
lock *lock.CASMutex
|
||||||
signalingCh chan interface{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewOfflineGate creates a new gate
|
// NewOfflineGate creates a new gate
|
||||||
func NewOfflineGate() *OfflineGate {
|
func NewOfflineGate() *OfflineGate {
|
||||||
return &OfflineGate{
|
return &OfflineGate{
|
||||||
lock: &sync.Mutex{},
|
lock: lock.NewCASMutex(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock locks readonly gate and returns a function to unlock
|
// Lock locks readonly gate and returns a function to unlock
|
||||||
func (o *OfflineGate) Lock() func() {
|
func (o *OfflineGate) Lock() func() {
|
||||||
o.lock.Lock()
|
o.lock.Lock()
|
||||||
o.signalingCh = make(chan interface{})
|
return o.lock.Unlock
|
||||||
return o.unlock
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *OfflineGate) unlock() {
|
|
||||||
if o.signalingCh == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
close(o.signalingCh)
|
|
||||||
o.signalingCh = nil
|
|
||||||
o.lock.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Watch returns a signaling channel.
|
|
||||||
// Unless channel is nil, client needs to watch for a signal on a channel to know when gate is unlocked.
|
|
||||||
// Signal channel is disposable: onced signaled, has to be disposed and acquired again.
|
|
||||||
func (o *OfflineGate) Watch() chan interface{} {
|
|
||||||
return o.signalingCh
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitingMiddleware returns an http handler that waits for the gate to be unlocked before continuing
|
// WaitingMiddleware returns an http handler that waits for the gate to be unlocked before continuing
|
||||||
func (o *OfflineGate) WaitingMiddleware(timeout time.Duration, next http.Handler) http.Handler {
|
func (o *OfflineGate) WaitingMiddleware(timeout time.Duration, next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
signalingCh := o.Watch()
|
if r.Method == "GET" || r.Method == "HEAD" || r.Method == "OPTIONS" || strings.HasPrefix(r.URL.Path, "/api/backup") || strings.HasPrefix(r.URL.Path, "/api/restore") {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
if signalingCh != nil {
|
return
|
||||||
if r.Method != "GET" && r.Method != "HEAD" && r.Method != "OPTIONS" {
|
|
||||||
select {
|
|
||||||
case <-signalingCh:
|
|
||||||
case <-time.After(timeout):
|
|
||||||
log.Println("error: Timeout waiting for the offline gate to signal")
|
|
||||||
httperror.WriteError(w, http.StatusRequestTimeout, "Timeout waiting for the offline gate to signal", http.ErrHandlerTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !o.lock.RTryLockWithTimeout(timeout) {
|
||||||
|
log.Println("error: Timeout waiting for the offline gate to signal")
|
||||||
|
httperror.WriteError(w, http.StatusRequestTimeout, "Timeout waiting for the offline gate to signal", http.ErrHandlerTimeout)
|
||||||
|
return
|
||||||
|
}
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
|
o.lock.RUnlock()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,77 +58,6 @@ func Test_hasToBeUnlockedToLockAgain(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_waitChannelWillBeEmpty_ifGateIsUnlocked(t *testing.T) {
|
|
||||||
o := NewOfflineGate()
|
|
||||||
|
|
||||||
signalingCh := o.Watch()
|
|
||||||
if signalingCh != nil {
|
|
||||||
t.Error("Signaling channel should be empty")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_startWaitingForSignal_beforeGateGetsUnlocked(t *testing.T) {
|
|
||||||
// scenario:
|
|
||||||
// 1. main routing locks the gate and waits for a consumer to start up
|
|
||||||
// 2. consumer starts up, notifies main and begins waiting for the gate to be unlocked
|
|
||||||
// 3. main unlocks the gate
|
|
||||||
// 4. consumer be able to continue
|
|
||||||
|
|
||||||
o := NewOfflineGate()
|
|
||||||
unlock := o.Lock()
|
|
||||||
|
|
||||||
signalingCh := o.Watch()
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
readerIsReady := sync.WaitGroup{}
|
|
||||||
readerIsReady.Add(1)
|
|
||||||
|
|
||||||
go func(t *testing.T) {
|
|
||||||
readerIsReady.Done()
|
|
||||||
|
|
||||||
// either wait for a signal or timeout
|
|
||||||
select {
|
|
||||||
case <-signalingCh:
|
|
||||||
case <-time.After(10 * time.Second):
|
|
||||||
t.Error("Failed to wait for a signal, exit by timeout")
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}(t)
|
|
||||||
|
|
||||||
readerIsReady.Wait()
|
|
||||||
unlock()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_startWaitingForSignal_afterGateGetsUnlocked(t *testing.T) {
|
|
||||||
// scenario:
|
|
||||||
// 1. main routing locks, gets waiting channel and unlocks
|
|
||||||
// 2. consumer starts up and begins waiting for the gate to be unlocked
|
|
||||||
// 3. consumer gets signal immediately and continues
|
|
||||||
|
|
||||||
o := NewOfflineGate()
|
|
||||||
unlock := o.Lock()
|
|
||||||
signalingCh := o.Watch()
|
|
||||||
unlock()
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
go func(t *testing.T) {
|
|
||||||
// either wait for a signal or timeout
|
|
||||||
select {
|
|
||||||
case <-signalingCh:
|
|
||||||
case <-time.After(10 * time.Second):
|
|
||||||
t.Error("Failed to wait for a signal, exit by timeout")
|
|
||||||
}
|
|
||||||
wg.Done()
|
|
||||||
}(t)
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_waitingMiddleware_executesImmediately_whenNotLocked(t *testing.T) {
|
func Test_waitingMiddleware_executesImmediately_whenNotLocked(t *testing.T) {
|
||||||
// scenario:
|
// scenario:
|
||||||
// 1. create an gate
|
// 1. create an gate
|
||||||
|
|
Loading…
Reference in New Issue