Merge pull request #3268 from briandowns/fix_node_name

Reference node name when needed
pull/3276/head
Brian Downs 2021-05-04 10:48:26 -07:00 committed by GitHub
commit e1b9067d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 15 additions and 13 deletions

View File

@ -65,14 +65,13 @@ var (
)
type ETCD struct {
client *etcd.Client
config *config.Control
name string
runtime *config.ControlRuntime
address string
cron *cron.Cron
s3 *s3
nodeName string
client *etcd.Client
config *config.Control
name string
runtime *config.ControlRuntime
address string
cron *cron.Cron
s3 *s3
}
type learnerProgress struct {
@ -92,8 +91,7 @@ type Members struct {
// ETCD with an initialized cron value.
func NewETCD() *ETCD {
return &ETCD{
cron: cron.New(),
nodeName: os.Getenv("NODE_NAME"),
cron: cron.New(),
}
}
@ -948,11 +946,13 @@ func (e *ETCD) listSnapshots(ctx context.Context, snapshotDir string) ([]snapsho
return nil, err
}
nodeName := os.Getenv("NODE_NAME")
for _, f := range files {
snapshots = append(snapshots, snapshotFile{
Name: f.Name(),
Location: "file://" + filepath.Join(snapshotDir, f.Name()),
NodeName: e.nodeName,
NodeName: nodeName,
CreatedAt: &metav1.Time{
Time: f.ModTime(),
},
@ -1022,18 +1022,20 @@ func (e *ETCD) StoreSnapshotData(ctx context.Context) error {
snapshotConfigMap.Data = make(map[string]string, 0)
}
nodeName := os.Getenv("NODE_NAME")
// remove entries for this node only
for k, v := range snapshotConfigMap.Data {
var sf snapshotFile
if err := json.Unmarshal([]byte(v), &sf); err != nil {
return err
}
if sf.NodeName == e.nodeName || sf.NodeName == "s3" {
if sf.NodeName == nodeName || sf.NodeName == "s3" {
delete(snapshotConfigMap.Data, k)
}
}
// this node's entries to the ConfigMap
// save this node's entries to the ConfigMap
for k, v := range data {
snapshotConfigMap.Data[k] = v
}