mirror of https://github.com/hashicorp/consul
fix: consume ignored entries in CE downgrade via Ent snapshot
This operation would previously fail due to unconsumed bytes in the decoder buffer when reading the Ent snapshot (the first byte of the record would be misinterpreted as a type indicator, and the remaining bytes would fail to be deserialized or read as invalid data). Ensure restore succeeds by decoding the ignored record as an interface{}, which will consume the record bytes without requiring a concrete target struct, then moving on to the next record.pull/20977/head
parent
e231f0ee9b
commit
24f226e5ad
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
server: fix Ent snapshot restore on CE when CE downgrade is enabled
|
||||
```
|
|
@ -196,7 +196,7 @@ func (c *FSM) Apply(log *raft.Log) interface{} {
|
|||
return nil
|
||||
}
|
||||
if structs.CEDowngrade && msgType >= 64 {
|
||||
c.logger.Warn("ignoring enterprise message, for downgrading to oss", "type", msgType)
|
||||
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msgType)
|
||||
return nil
|
||||
}
|
||||
panic(fmt.Errorf("failed to apply request: %#v", buf))
|
||||
|
@ -268,8 +268,9 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
default:
|
||||
if structs.CEDowngrade && msg >= 64 {
|
||||
c.logger.Warn("ignoring enterprise message , for downgrading to oss", "type", msg)
|
||||
return nil
|
||||
c.logger.Warn("ignoring enterprise message as part of downgrade to CE", "type", msg)
|
||||
var ignore interface{}
|
||||
return dec.Decode(&ignore)
|
||||
} else if msg >= 64 {
|
||||
return fmt.Errorf("msg type <%d> is a Consul Enterprise log entry. Consul CE cannot restore it", msg)
|
||||
} else {
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib/stringslice"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
|
@ -60,3 +61,101 @@ func TestRestoreFromEnterprise(t *testing.T) {
|
|||
require.EqualError(t, fsm.Restore(sink), "msg type <65> is a Consul Enterprise log entry. Consul CE cannot restore it")
|
||||
sink.Cancel()
|
||||
}
|
||||
|
||||
func TestRestoreFromEnterprise_CEDowngrade(t *testing.T) {
|
||||
logger := testutil.Logger(t)
|
||||
|
||||
handle := &testRaftHandle{}
|
||||
storageBackend := newStorageBackend(t, handle)
|
||||
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
|
||||
|
||||
fsm := NewFromDeps(Deps{
|
||||
Logger: logger,
|
||||
NewStateStore: func() *state.Store {
|
||||
return state.NewStateStore(nil)
|
||||
},
|
||||
StorageBackend: storageBackend,
|
||||
})
|
||||
|
||||
// To verify if a proper message is displayed when Consul CE tries to
|
||||
// unsuccessfully restore entries from a Consul Ent snapshot.
|
||||
buf := bytes.NewBuffer(nil)
|
||||
sink := &MockSink{buf, false}
|
||||
|
||||
type EntMock struct {
|
||||
ID int
|
||||
Type string
|
||||
}
|
||||
|
||||
entMockEntry := EntMock{
|
||||
ID: 65,
|
||||
Type: "A Consul Ent Log Type",
|
||||
}
|
||||
|
||||
// Create one entry to exercise the Go struct marshaller, and one to exercise the
|
||||
// Binary Marshaller interface. This verifies that regardless of whether the struct gets
|
||||
// encoded as a msgpack byte string (binary marshaller) or msgpack map (other struct),
|
||||
// it will still be skipped over correctly.
|
||||
registerEntry := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
Tags: []string{"primary"},
|
||||
Port: 8000,
|
||||
},
|
||||
}
|
||||
proxyDefaultsEntry := &structs.ConfigEntryRequest{
|
||||
Op: structs.ConfigEntryUpsert,
|
||||
Entry: &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(),
|
||||
},
|
||||
}
|
||||
|
||||
// Write the header and records.
|
||||
header := SnapshotHeader{
|
||||
LastIndex: 0,
|
||||
}
|
||||
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
|
||||
encoder.Encode(&header)
|
||||
sink.Write([]byte{byte(structs.MessageType(entMockEntry.ID))})
|
||||
encoder.Encode(entMockEntry)
|
||||
sink.Write([]byte{byte(structs.RegisterRequestType)})
|
||||
encoder.Encode(registerEntry)
|
||||
sink.Write([]byte{byte(structs.ConfigEntryRequestType)})
|
||||
encoder.Encode(proxyDefaultsEntry)
|
||||
|
||||
defer func() {
|
||||
structs.CEDowngrade = false
|
||||
}()
|
||||
structs.CEDowngrade = true
|
||||
|
||||
require.NoError(t, fsm.Restore(sink), "failed to decode Ent snapshot to CE")
|
||||
|
||||
// Verify the register request
|
||||
_, nodes, err := fsm.state.Nodes(nil, nil, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, nodes, 1, "incorrect number of nodes: %v", nodes)
|
||||
require.Equal(t, "foo", nodes[0].Node)
|
||||
require.Equal(t, "dc1", nodes[0].Datacenter)
|
||||
require.Equal(t, "127.0.0.1", nodes[0].Address)
|
||||
_, fooSrv, err := fsm.state.NodeServices(nil, "foo", nil, "")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, fooSrv.Services, 1)
|
||||
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
||||
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
||||
require.Equal(t, 8000, fooSrv.Services["db"].Port)
|
||||
|
||||
// Verify the proxy defaults request
|
||||
_, configEntry, err := fsm.state.ConfigEntry(nil, structs.ProxyDefaults, "global", structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||
require.NoError(t, err)
|
||||
configEntry.SetHash(proxyDefaultsEntry.Entry.GetHash())
|
||||
require.Equal(t, proxyDefaultsEntry.Entry, configEntry)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue