From a44b9e84b167ffb0d8aab92f38c2e980ce38f374 Mon Sep 17 00:00:00 2001
From: Pierre Souchay
Date: Mon, 19 Mar 2018 14:14:03 +0100
Subject: [PATCH] =?UTF-8?q?[BUGFIX]=C2=A0When=20a=20node=20level=20check?=
=?UTF-8?q?=20is=20removed,=20ensure=20all=20services=20of=20node=20are=20?=
=?UTF-8?q?notified?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Bugfix for https://github.com/hashicorp/consul/pull/3899
When a node level check is removed (example: maintenance),
some watchers on services might have to recompute their state.
If those nodes are performing blocking queries, they have to be notified.
While their state was updated when node-level state did change or was added
this was not the case when the check was removed. This fixes it.
---
agent/consul/state/catalog.go | 85 +++++++++++++++---------------
agent/consul/state/catalog_test.go | 5 ++
2 files changed, 48 insertions(+), 42 deletions(-)
diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go
index 38d59e8109..6fcb45955b 100644
--- a/agent/consul/state/catalog.go
+++ b/agent/consul/state/catalog.go
@@ -1086,6 +1086,43 @@ func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
return nil
}
+// This ensure all connected services to a check are updated properly
+func (s *Store) alterServicesFromCheck(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck, checkServiceExists bool) error {
+ // If the check is associated with a service, check that we have
+ // a registration for the service.
+ if hc.ServiceID != "" {
+ service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
+ // When deleting a service, service might have been removed already
+ if err != nil && checkServiceExists {
+ return fmt.Errorf("failed service lookup: %s", err)
+ }
+ if service == nil {
+ return ErrMissingService
+ }
+
+ // Copy in the service name and tags
+ svc := service.(*structs.ServiceNode)
+ hc.ServiceName = svc.ServiceName
+ hc.ServiceTags = svc.ServiceTags
+ if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ } else {
+ // Update the status for all the services associated with this node
+ services, err := tx.Get("services", "node", hc.Node)
+ if err != nil {
+ return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
+ }
+ for service := services.Next(); service != nil; service = services.Next() {
+ svc := service.(*structs.ServiceNode).ToNodeService()
+ if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
+ return fmt.Errorf("failed updating index: %s", err)
+ }
+ }
+ }
+ return nil
+}
+
// ensureCheckTransaction is used as the inner method to handle inserting
// a health check into the state store. It ensures safety against inserting
// checks with no matching node or service.
@@ -1119,36 +1156,9 @@ func (s *Store) ensureCheckTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthChec
return ErrMissingNode
}
- // If the check is associated with a service, check that we have
- // a registration for the service.
- if hc.ServiceID != "" {
- service, err := tx.First("services", "id", hc.Node, hc.ServiceID)
- if err != nil {
- return fmt.Errorf("failed service lookup: %s", err)
- }
- if service == nil {
- return ErrMissingService
- }
-
- // Copy in the service name and tags
- svc := service.(*structs.ServiceNode)
- hc.ServiceName = svc.ServiceName
- hc.ServiceTags = svc.ServiceTags
- if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
- return fmt.Errorf("failed updating index: %s", err)
- }
- } else {
- // Update the status for all the services associated with this node
- services, err := tx.Get("services", "node", hc.Node)
- if err != nil {
- return fmt.Errorf("failed updating services for node %s: %s", hc.Node, err)
- }
- for service := services.Next(); service != nil; service = services.Next() {
- svc := service.(*structs.ServiceNode).ToNodeService()
- if err := tx.Insert("index", &IndexEntry{serviceIndexName(svc.Service), idx}); err != nil {
- return fmt.Errorf("failed updating index: %s", err)
- }
- }
+ err = s.alterServicesFromCheck(tx, idx, hc, true)
+ if err != nil {
+ return err
}
// Delete any sessions for this check if the health is critical.
@@ -1390,19 +1400,10 @@ func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID t
return nil
}
existing := hc.(*structs.HealthCheck)
- if existing != nil && existing.ServiceID != "" {
- service, err := tx.First("services", "id", node, existing.ServiceID)
+ if existing != nil {
+ err = s.alterServicesFromCheck(tx, idx, existing, false)
if err != nil {
- return fmt.Errorf("failed service lookup: %s", err)
- }
- if service == nil {
- return ErrMissingService
- }
-
- // Updated index of service
- svc := service.(*structs.ServiceNode)
- if err = tx.Insert("index", &IndexEntry{serviceIndexName(svc.ServiceName), idx}); err != nil {
- return fmt.Errorf("failed updating index: %s", err)
+ return fmt.Errorf("Failed to update services linked to deleted healthcheck: %s", err)
}
}
diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go
index b22b82fed6..1c3b1a8678 100644
--- a/agent/consul/state/catalog_test.go
+++ b/agent/consul/state/catalog_test.go
@@ -2119,6 +2119,7 @@ func TestStateStore_DeleteCheck(t *testing.T) {
// Register a node and a node-level health check.
testRegisterNode(t, s, 1, "node1")
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
+ testRegisterService(t, s, 2, "node1", "service1")
// Make sure the check is there.
ws := memdb.NewWatchSet()
@@ -2130,6 +2131,8 @@ func TestStateStore_DeleteCheck(t *testing.T) {
t.Fatalf("bad: %#v", checks)
}
+ ensureServiceVersion(t, s, ws, "service1", 2, 1)
+
// Delete the check.
if err := s.DeleteCheck(3, "node1", "check1"); err != nil {
t.Fatalf("err: %s", err)
@@ -2137,6 +2140,8 @@ func TestStateStore_DeleteCheck(t *testing.T) {
if !watchFired(ws) {
t.Fatalf("bad")
}
+ // All services linked to this node should have their index updated
+ ensureServiceVersion(t, s, ws, "service1", 3, 1)
// Check is gone
ws = memdb.NewWatchSet()