perf: replace concatMap to flatMapSequential to improve parallelism and efficiency (#6706)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.20.x

#### What this PR does / why we need it:
将 concatMap 替换为 flatMapSequential 以提高并行度和执行效率

可以看一下这个场景示例来模拟像文章列表 API 的数据组装
假如每个步骤的执行时间是 1s 有 4 个步骤 同时 Flux 发出 4 条数据:

```java
@Test  
void test() {  
    var startMs = System.currentTimeMillis();  
  
    var monoA = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "A";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoB = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "B";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoC = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "C";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var monoD = Mono.fromSupplier(  
            () -> {  
                sleep();  
                return "D";  
            })        .subscribeOn(Schedulers.boundedElastic());  
  
    var convert = Mono.when(monoA, monoB, monoC, monoD);  

    Flux.just("1", "2", "3", "4")
        // concatMap(convert::thenReturn)
        .flatMapSequential(convert::thenReturn)  
        .collectList()  
        .block(); 

    System.out.println("Time: " + (System.currentTimeMillis() - startMs));  
}

private static void sleep() {  
    try {  
        Thread.sleep(1000);  
    } catch (InterruptedException e) {  
        throw new RuntimeException(e);  
    }
}
```
**结果:**
1. 如果每个步骤没有加  subscribeOn 且使用 concatMap 耗时: 16362 ms
2. 每个步骤使用 subscribeOn 且使用 concatMap 耗时: 4174 ms
3. 每个步骤使用 subscribeOn 且使用 flatMapSequential 耗时: 1185 ms

#### Does this PR introduce a user-facing change?
```release-note
提升页面访问速度
```
pull/6817/head
guqing 2024-10-10 17:49:01 +08:00 committed by GitHub
parent 02c54846dc
commit 25c54d792e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 17 additions and 17 deletions

View File

@ -58,7 +58,7 @@ public class CommentServiceImpl extends AbstractCommentService implements Commen
commentQuery.toPageRequest())
.flatMap(comments -> Flux.fromStream(comments.get()
.map(this::toListedComment))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(list -> new ListResult<>(comments.getPage(), comments.getSize(),
comments.getTotal(), list)

View File

@ -166,7 +166,7 @@ public class ReplyServiceImpl extends AbstractCommentService implements ReplySer
return client.listBy(Reply.class, query.toListOptions(), query.toPageRequest())
.flatMap(list -> Flux.fromStream(list.get()
.map(this::toListedReply))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedReplies -> new ListResult<>(list.getPage(), list.getSize(),
list.getTotal(), listedReplies))

View File

@ -77,7 +77,7 @@ public class PostServiceImpl extends AbstractContentService implements PostServi
)
.flatMap(listResult -> Flux.fromStream(listResult.get())
.map(this::getListedPost)
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedPosts -> new ListResult<>(listResult.getPage(), listResult.getSize(),
listResult.getTotal(), listedPosts)
@ -175,7 +175,7 @@ public class PostServiceImpl extends AbstractContentService implements PostServi
return Flux.empty();
}
return Flux.fromIterable(usernames)
.concatMap(userService::getUserOrGhost)
.flatMapSequential(userService::getUserOrGhost)
.map(user -> {
Contributor contributor = new Contributor();
contributor.setName(user.getMetadata().getName());

View File

@ -88,7 +88,7 @@ public class SinglePageServiceImpl extends AbstractContentService implements Sin
public Mono<ListResult<ListedSinglePage>> list(SinglePageQuery query) {
return client.listBy(SinglePage.class, query.toListOptions(), query.toPageRequest())
.flatMap(listResult -> Flux.fromStream(listResult.get().map(this::getListedSinglePage))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(listedSinglePages -> new ListResult<>(
listResult.getPage(),

View File

@ -267,7 +267,7 @@ public class PluginServiceImpl implements PluginService, InitializingBean, Dispo
});
var body = Flux.fromIterable(startedPlugins)
.sort(Comparator.comparing(PluginWrapper::getPluginId))
.concatMap(pluginWrapper -> {
.flatMapSequential(pluginWrapper -> {
var pluginId = pluginWrapper.getPluginId();
return Mono.<Resource>fromSupplier(
() -> BundleResourceUtils.getJsBundleResource(
@ -294,7 +294,7 @@ public class PluginServiceImpl implements PluginService, InitializingBean, Dispo
public Flux<DataBuffer> uglifyCssBundle() {
return Flux.fromIterable(pluginManager.getStartedPlugins())
.sort(Comparator.comparing(PluginWrapper::getPluginId))
.concatMap(pluginWrapper -> {
.flatMapSequential(pluginWrapper -> {
var pluginId = pluginWrapper.getPluginId();
var dataBufferFactory = DefaultDataBufferFactory.sharedInstance;
return Mono.<Resource>fromSupplier(() -> BundleResourceUtils.getJsBundleResource(

View File

@ -86,10 +86,10 @@ public class DefaultExtensionGetter implements ExtensionGetter {
}
var extensions = getExtensions(extensionPoint).cache();
return Flux.fromIterable(extensionDefNames)
.concatMap(extensionDefName ->
.flatMapSequential(extensionDefName ->
client.fetch(ExtensionDefinition.class, extensionDefName)
)
.concatMap(extensionDef -> {
.flatMapSequential(extensionDef -> {
var className = extensionDef.getSpec().getClassName();
return extensions.filter(
extension -> Objects.equals(extension.getClass().getName(),

View File

@ -86,7 +86,7 @@ public class CommentPublicQueryServiceImpl implements CommentPublicQueryService
return client.listBy(Comment.class, listOptions, pageRequest)
.flatMap(listResult -> Flux.fromStream(listResult.get())
.map(this::toCommentVo)
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(commentVos -> new ListResult<>(listResult.getPage(),
listResult.getSize(),
@ -102,7 +102,7 @@ public class CommentPublicQueryServiceImpl implements CommentPublicQueryService
public Mono<ListResult<CommentWithReplyVo>> convertToWithReplyVo(ListResult<CommentVo> comments,
int replySize) {
return Flux.fromIterable(comments.getItems())
.concatMap(commentVo -> {
.flatMapSequential(commentVo -> {
var commentName = commentVo.getMetadata().getName();
return listReply(commentName, 1, replySize)
.map(replyList -> CommentWithReplyVo.from(commentVo)
@ -135,7 +135,7 @@ public class CommentPublicQueryServiceImpl implements CommentPublicQueryService
.orElse(PageRequestImpl.ofSize(0));
return client.listBy(Reply.class, listOptions, pageRequest)
.flatMap(list -> Flux.fromStream(list.get().map(this::toReplyVo))
.concatMap(Function.identity())
.flatMapSequential(Function.identity())
.collectList()
.map(replyVos -> new ListResult<>(list.getPage(), list.getSize(),
list.getTotal(),

View File

@ -33,6 +33,6 @@ public class ContributorFinderImpl implements ContributorFinder {
return Flux.empty();
}
return Flux.fromIterable(names)
.concatMap(this::getContributor);
.flatMapSequential(this::getContributor);
}
}

View File

@ -291,7 +291,7 @@ public class PostFinderImpl implements PostFinder {
public Flux<ListedPostVo> listAll() {
return postPredicateResolver.getListOptions()
.flatMapMany(listOptions -> client.listAll(Post.class, listOptions, defaultSort()))
.concatMap(postPublicQueryService::convertToListedVo);
.flatMapSequential(postPublicQueryService::convertToListedVo);
}
static int pageNullSafe(Integer page) {

View File

@ -67,7 +67,7 @@ public class PostPublicQueryServiceImpl implements PostPublicQueryService {
})
.flatMap(listOptions -> client.listBy(Post.class, listOptions, page))
.flatMap(list -> Flux.fromStream(list.get())
.concatMap(post -> convertToListedVo(post)
.flatMapSequential(post -> convertToListedVo(post)
.flatMap(postVo -> populateStats(postVo)
.doOnNext(postVo::setStats).thenReturn(postVo)
)

View File

@ -140,7 +140,7 @@ public class SinglePageConversionServiceImpl implements SinglePageConversionServ
return client.listBy(SinglePage.class, rewroteListOptions, rewrotePageRequest)
.flatMap(list -> Flux.fromStream(list.get())
.concatMap(this::convertToListedVo)
.flatMapSequential(this::convertToListedVo)
.collectList()
.map(pageVos ->
new ListResult<>(list.getPage(), list.getSize(), list.getTotal(), pageVos)

View File

@ -48,7 +48,7 @@ public class TagFinderImpl implements TagFinder {
@Override
public Flux<TagVo> getByNames(List<String> names) {
return Flux.fromIterable(names)
.concatMap(this::getByName);
.flatMapSequential(this::getByName);
}
@Override