refactor: optimize the extension watch parameters to always be of real type (#6180)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.17.x

#### What this PR does / why we need it:
优化触发 Extension Watch 方法(onAdd/onUpdate/onDelete)时的参数始终为真实类型避免使用时进行类型转换

#### Does this PR introduce a user-facing change?
```release-note
None
```
pull/6185/head v2.17.0-alpha.2
guqing 2024-06-27 18:16:54 +08:00 committed by GitHub
parent fc35e69766
commit 68d94f6653
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 46 additions and 3 deletions

View File

@ -228,7 +228,7 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
}) })
.map(converter::convertTo) .map(converter::convertTo)
.flatMap(extStore -> doCreate(extension, extStore.getName(), extStore.getData()) .flatMap(extStore -> doCreate(extension, extStore.getName(), extStore.getData())
.doOnNext(watchers::onAdd) .doOnNext(created -> watchers.onAdd(convertToRealExtension(created)))
) )
.retryWhen(Retry.backoff(3, Duration.ofMillis(100)) .retryWhen(Retry.backoff(3, Duration.ofMillis(100))
// retry when generateName is set // retry when generateName is set
@ -266,7 +266,9 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var store = this.converter.convertTo(newJsonExt); var store = this.converter.convertTo(newJsonExt);
var updated = doUpdate(extension, store.getName(), store.getVersion(), store.getData()); var updated = doUpdate(extension, store.getName(), store.getVersion(), store.getData());
if (!onlyStatusChanged) { if (!onlyStatusChanged) {
updated = updated.doOnNext(ext -> watchers.onUpdate(old, ext)); updated = updated.doOnNext(ext -> watchers.onUpdate(convertToRealExtension(old),
convertToRealExtension(ext))
);
} }
return updated; return updated;
}); });
@ -293,7 +295,7 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
var extensionStore = converter.convertTo(extension); var extensionStore = converter.convertTo(extension);
return doUpdate(extension, extensionStore.getName(), return doUpdate(extension, extensionStore.getName(),
extensionStore.getVersion(), extensionStore.getData() extensionStore.getVersion(), extensionStore.getData()
).doOnNext(watchers::onDelete); ).doOnNext(updated -> watchers.onDelete(convertToRealExtension(extension)));
} }
@Override @Override

View File

@ -688,6 +688,47 @@ class ReactiveExtensionClientTest {
verify(watcher, times(1)).onDelete(any()); verify(watcher, times(1)).onDelete(any());
} }
@Test
void shouldWatchRealType() {
var extensionStore = createExtensionStore("/registry/fake.halo.run/fakes/fake");
var fake = createFakeExtension("fake", 1L);
var unstructured = Unstructured.OBJECT_MAPPER.convertValue(fake, Unstructured.class);
when(storeClient.fetchByName(extensionStore.getName()))
.thenReturn(Mono.just(extensionStore));
when(converter.convertTo(any())).thenReturn(extensionStore);
when(converter.convertFrom(same(Unstructured.class), any())).thenReturn(unstructured);
var indexer = mock(Indexer.class);
when(indexerFactory.getIndexer(eq(fake.groupVersionKind()))).thenReturn(indexer);
// on add
when(storeClient.create(any(), any())).thenReturn(Mono.just(extensionStore));
doNothing().when(watcher).onAdd(any(Extension.class));
StepVerifier.create(client.create(unstructured))
.expectNext(unstructured)
.verifyComplete();
verify(watcher, times(1)).onAdd(isA(FakeExtension.class));
// on update
when(storeClient.update(any(), any(), any())).thenReturn(Mono.just(extensionStore));
doNothing().when(watcher).onUpdate(any(), any());
StepVerifier.create(client.update(unstructured))
.expectNext(unstructured)
.verifyComplete();
verify(watcher, times(1))
.onUpdate(isA(FakeExtension.class), isA(FakeExtension.class));
// on delete
doNothing().when(watcher).onDelete(any());
StepVerifier.create(client.delete(unstructured))
.expectNext(unstructured)
.verifyComplete();
verify(watcher, times(1)).onDelete(isA(FakeExtension.class));
}
} }
} }