refactor: add post-index build validation to ensure data count matches index (#6507)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.19.x

#### What this PR does / why we need it:
构建完索引后增加数据量一致性校验的步骤

#### Does this PR introduce a user-facing change?
```release-note
None
```
pull/6526/head
guqing 2024-08-26 17:05:14 +08:00 committed by GitHub
parent 3be91fcb6f
commit 8405a6376e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 34 additions and 1 deletions

View File

@ -480,7 +480,20 @@ public class ReactiveExtensionClientImpl implements ReactiveExtensionClient {
private void createIndexerFor(Scheme scheme) {
setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), true);
indexerFactory.createIndexerFor(scheme.type(), createExtensionIterator(scheme));
var iterator = createExtensionIterator(scheme);
indexerFactory.createIndexerFor(scheme.type(), iterator);
// ensure data count matches index count
var prefix = ExtensionStoreUtil.buildStoreNamePrefix(scheme);
var indexedSize = indexedQueryEngine.retrieveAll(scheme.groupVersionKind(),
new ListOptions(), Sort.unsorted()).size();
long count = client.countByNamePrefix(prefix).blockOptional().orElseThrow();
if (count != iterator.size() || count != indexedSize) {
log.error("indexedSize: {}, count in db: {}, iterate size: {}", indexedSize, count,
iterator.size());
throw new IllegalStateException("The number of indexed records is not equal to the "
+ "number of records in the database, this is a serious error, please submit "
+ "an issue to halo.");
}
setIndexBuildingStateFor(scheme.groupVersionKind().groupKind(), false);
}
}

View File

@ -21,6 +21,7 @@ public class DefaultExtensionIterator<E extends Extension> implements ExtensionI
private Pageable currentPageable;
private List<E> currentData;
private int currentIndex;
private long size;
public DefaultExtensionIterator(ExtensionPaginatedLister<E> lister) {
this(PageRequest.of(0, DEFAULT_PAGE_SIZE, Sort.by("name")), lister);
@ -61,6 +62,12 @@ public class DefaultExtensionIterator<E extends Extension> implements ExtensionI
if (!hasNext()) {
throw new NoSuchElementException();
}
this.size++;
return currentData.get(currentIndex++);
}
@Override
public long size() {
return this.size;
}
}

View File

@ -14,4 +14,10 @@ import run.halo.app.extension.Extension;
*/
public interface ExtensionIterator<E extends Extension> extends Iterator<E> {
/**
* Get the total size of the extensions that this iterator iterates over.
*
* @return the total size of the extensions.
*/
long size();
}

View File

@ -12,6 +12,8 @@ public interface ReactiveExtensionStoreClient {
Mono<Page<ExtensionStore>> listByNamePrefix(String prefix, Pageable pageable);
Mono<Long> countByNamePrefix(String prefix);
/**
* List stores by names and return data according to given names order.
*

View File

@ -34,6 +34,11 @@ public class ReactiveExtensionStoreClientImpl implements ReactiveExtensionStoreC
.map(p -> new PageImpl<>(p.getT1(), pageable, p.getT2()));
}
@Override
public Mono<Long> countByNamePrefix(String prefix) {
return this.repository.countByNameStartingWith(prefix);
}
@Override
public Flux<ExtensionStore> listByNames(List<String> names) {
ToIntFunction<ExtensionStore> comparator =