mirror of https://github.com/hashicorp/consul
fix: consume ignored entries in CE downgrade via Ent snapshot (#20977)
This operation would previously fail due to unconsumed bytes in the decoder buffer when reading the Ent snapshot (the first byte of the record would be misinterpreted as a type indicator, and the remaining bytes would fail to be deserialized or read as invalid data). Ensure restore succeeds by decoding the ignored record as an interface{}, which will consume the record bytes without requiring a concrete target struct, then moving on to the next record.pull/20864/head
parent
e231f0ee9b
commit
a8d08e759f
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
server: fix Ent snapshot restore on CE when CE downgrade is enabled
|
||||
```
|
|
@ -196,7 +196,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
|
|||
return nil
|
||||
}
|
||||
if structs.CEDowngrade && msgType >= 64 {
|
||||
c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType)
|
||||
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msgType)
|
||||
return nil
|
||||
}
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
|
@ -268,8 +268,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
default:
|
||||
if structs.CEDowngrade && msg >= 64 {
|
||||
c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg)
|
||||
return nil
|
||||
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msg)
|
||||
var ignore interface{}
|
||||
return dec.Decode(&ignore)
|
||||
} else if msg >= 64 {
|
||||
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg)
|
||||
} else {
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/stringslice"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
|
@ -60,3 +61,101 @@ func TestRestoreFromEnterprise(t *testing.T) {
|
|||
require.EqualError(t, fsm.Restore(sink), "msg type <65> is a Consul Enterprise log entry. Consul CE cannot restore it")
|
||||
sink.Cancel()
|
||||
}
|
||||
|
||||
func TestRestoreFromEnterprise_CEDowngrade(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
|
||||
handle := &testRaftHandle{}
|
||||
storageBackend := newStorageBackend(t, handle)
|
||||
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: storageBackend,
|
||||
})
|
||||
|
||||
// To verify if a proper message is displayed when Consul CE tries to
|
||||
// unsuccessfully restore entries from a Consul Ent snapshot.
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
|
||||
type EntMock struct {
|
||||
ID int
|
||||
Type string
|
||||
}
|
||||
|
||||
entMockEntry := EntMock{
|
||||
ID: 65,
|
||||
Type: "A Consul Ent Log Type",
|
||||
}
|
||||
|
||||
// Create one entry to exercise the Go struct marshaller, and one to exercise the
|
||||
// Binary Marshaller interface. This verifies that regardless of whether the struct gets
|
||||
// encoded as a msgpack byte string (binary marshaller) or msgpack map (other struct),
|
||||
// it will still be skipped over correctly.
|
||||
registerEntry := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
Tags: []string{"primary"},
|
||||
Port: 8000,
|
||||
},
|
||||
}
|
||||
proxyDefaultsEntry := &structs.ConfigEntryRequest{
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
},
|
||||
}
|
||||
|
||||
// Write the header and records.
|
||||
header := SnapshotHeader{
|
||||
LastIndex: 0,
|
||||
}
|
||||
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
|
||||
encoder.Encode(&header)
|
||||
sink.Write([]byte{byte(structs.MessageType(entMockEntry.ID))})
|
||||
encoder.Encode(entMockEntry)
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
encoder.Encode(registerEntry)
|
||||
sink.Write([]byte{byte(structs.ConfigEntryRequestType)})
|
||||
encoder.Encode(proxyDefaultsEntry)
|
||||
|
||||
defer func() {
|
||||
structs.CEDowngrade = false
|
||||
}()
|
||||
structs.CEDowngrade = true
|
||||
|
||||
require.NoError(t, fsm.Restore(sink), "failed to decode Ent snapshot to CE")
|
||||
|
||||
// Verify the register request
|
||||
_, nodes, err := fsm.state.Nodes(nil, nil, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 1, "incorrect number of nodes: %v", nodes)
|
||||
require.Equal(t, "foo", nodes[0].Node)
|
||||
require.Equal(t, "dc1", nodes[0].Datacenter)
|
||||
require.Equal(t, "127.0.0.1", nodes[0].Address)
|
||||
_, fooSrv, err := fsm.state.NodeServices(nil, "foo", nil, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, fooSrv.Services, 1)
|
||||
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
||||
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
||||
require.Equal(t, 8000, fooSrv.Services["db"].Port)
|
||||
|
||||
// Verify the proxy defaults request
|
||||
_, configEntry, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||
require.NoError(t, err)
|
||||
configEntry.SetHash(proxyDefaultsEntry.Entry.GetHash())
|
||||
require.Equal(t, proxyDefaultsEntry.Entry, configEntry)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue