Fix the problem that synchronizer might be started multiple times (#7642)

#### What type of PR is this?

/kind bug
/area core
/milestone 2.21.x

#### What this PR does / why we need it:

This PR fixes the problem of starting synchronizer multiple times while configuring multiple workers.

#### Does this PR introduce a user-facing change?

```release-note
None
```
pull/7645/head
John Niang 2025-07-29 14:49:50 +08:00 committed by GitHub
parent 177cfcc69c
commit ae9dd6f3d3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 11 additions and 10 deletions

View File

@ -135,6 +135,9 @@ public class DefaultController<R> implements Controller {
return;
}
this.started = true;
if (synchronizer != null) {
synchronizer.start();
}
log.info("Starting controller {}", name);
IntStream.range(0, getWorkerCount())
.mapToObj(i -> new Worker())
@ -162,9 +165,6 @@ public class DefaultController<R> implements Controller {
@Override
public void run() {
log.info("Controller worker {} started", this.name);
if (synchronizer != null) {
synchronizer.start();
}
while (!isDisposed() && !Thread.currentThread().isInterrupted()) {
try {
var entry = queue.take();

View File

@ -91,7 +91,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue, times(0)).add(any());
verify(queue, times(1)).done(any());
@ -109,7 +109,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
@ -129,7 +129,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
@ -150,7 +150,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue).done(any());
verify(queue).add(argThat(de ->
@ -171,7 +171,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue).done(any());
@ -190,7 +190,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
@ -211,7 +211,7 @@ class DefaultControllerTest {
controller.new Worker().run();
verify(synchronizer, times(1)).start();
verify(synchronizer, never()).start();
verify(queue, times(2)).take();
verify(queue, times(1)).done(any());
verify(queue, times(1)).add(argThat(de ->
@ -262,6 +262,7 @@ class DefaultControllerTest {
assertTrue(controller.isStarted());
assertFalse(controller.isDisposed());
verify(synchronizer).start();
verify(executor).execute(any(Runnable.class));
}