fix: resolve concurrency issue causing duplicate thumbnail generation (#7077)

#### What type of PR is this?
/kind improvement
/area core
/milestone 2.20.x

#### What this PR does / why we need it:
修复可能为因为并发调用缩略图生成导致多次创建缩略图的问题

此 PR 为 #7031 的补充,并且会清理以前重复生成的缩略图记录和文件

#### Does this PR introduce a user-facing change?

```release-note
修复可能为因为并发调用缩略图生成导致多次重复缩略图记录的问题
```
pull/7081/head
guqing 2024-11-26 11:26:28 +08:00 committed by GitHub
parent 2ed3bb6838
commit ec5c70f951
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 235 additions and 14 deletions

View File

@ -0,0 +1,144 @@
package run.halo.app.core.attachment;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.function.Function;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.dao.OptimisticLockingFailureException;
import org.springframework.lang.NonNull;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import run.halo.app.core.attachment.extension.LocalThumbnail;
import run.halo.app.core.attachment.extension.Thumbnail;
import run.halo.app.extension.Extension;
import run.halo.app.extension.ListOptions;
import run.halo.app.extension.ReactiveExtensionClient;
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
/**
* <p>TODO Remove this class in the next major version.</p>
* when this class is removed, the following code should be added:
* <pre>
* <code>
* schemeManager.register(LocalThumbnail.class, indexSpec -> {
* indexSpec.add(new IndexSpec()
* // mark the index as unique
* .setUnique(true)
* .setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
* .setIndexFunc(simpleAttribute(LocalThumbnail.class,
* LocalThumbnail::uniqueImageAndSize)
* )
* );
* // ...
* });
* schemeManager.register(Thumbnail.class, indexSpec -> {
* indexSpec.add(new IndexSpec()
* // mark the index as unique
* .setUnique(true)
* .setName(Thumbnail.ID_INDEX)
* .setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
* );
* // ...
* });
* </code>
* </pre>
*
* @see run.halo.app.infra.SchemeInitializer
* @since 2.20.9
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ThumbnailMigration {
private final LocalThumbnailService localThumbnailService;
private final ReactiveExtensionClient client;
private final ReactiveExtensionPaginatedOperator extensionPaginatedOperator;
@Async
@EventListener(ApplicationStartedEvent.class)
public void onApplicationEvent(@NonNull ApplicationStartedEvent event) {
cleanupThumbnail(Thumbnail.class,
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
thumbnail.getSpec().getSize().name()))
.count()
.doOnNext(count -> log.info("Deleted {} duplicate thumbnail records", count))
.block();
cleanupThumbnail(LocalThumbnail.class,
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
thumbnail.getSpec().getSize().name()))
.flatMap(thumb -> {
var filePath = localThumbnailService.toFilePath(thumb.getSpec().getFilePath());
return deleteFile(filePath).thenReturn(thumb.getMetadata().getName());
})
.count()
.doOnNext(count -> log.info("Deleted {} duplicate local thumbnail records.", count))
.block();
log.info("Duplicate thumbnails have been cleaned up.");
}
private Mono<Void> deleteFile(Path path) {
return Mono.fromRunnable(
() -> {
try {
Files.deleteIfExists(path);
} catch (Exception e) {
// Ignore
}
})
.subscribeOn(Schedulers.boundedElastic())
.then();
}
private <T extends Extension> Flux<T> cleanupThumbnail(Class<T> thumbClass,
Function<T, UniqueKey> keyFunction) {
var unique = new HashSet<UniqueKey>();
var duplicateThumbs = new ArrayList<T>();
var collectDuplicateMono = extensionPaginatedOperator.list(thumbClass, new ListOptions())
.doOnNext(thumbnail -> {
var key = keyFunction.apply(thumbnail);
if (unique.contains(key)) {
duplicateThumbs.add(thumbnail);
} else {
unique.add(key);
}
})
.then();
return Mono.when(collectDuplicateMono)
.thenMany(Flux.fromIterable(duplicateThumbs)
.flatMap(this::deleteThumbnail)
);
}
@SuppressWarnings("unchecked")
private <T extends Extension> Mono<T> deleteThumbnail(T thumbnail) {
return client.delete(thumbnail)
.onErrorResume(OptimisticLockingFailureException.class,
e -> deleteThumbnail((Class<T>) thumbnail.getClass(),
thumbnail.getMetadata().getName())
);
}
private <T extends Extension> Mono<T> deleteThumbnail(Class<T> clazz, String name) {
return Mono.defer(() -> client.fetch(clazz, name)
.flatMap(client::delete)
)
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
.filter(OptimisticLockingFailureException.class::isInstance));
}
record UniqueKey(String imageUri, String size) {
}
}

View File

@ -6,7 +6,9 @@ import static run.halo.app.extension.index.query.QueryFactory.startsWith;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Sort;
@ -38,9 +40,24 @@ public class ThumbnailServiceImpl implements ThumbnailService {
private final ExternalLinkProcessor externalLinkProcessor;
private final ThumbnailProvider thumbnailProvider;
private final LocalThumbnailService localThumbnailService;
private final Map<CacheKey, Mono<URI>> ongoingTasks = new ConcurrentHashMap<>();
@Override
public Mono<URI> generate(URI imageUri, ThumbnailSize size) {
var cacheKey = new CacheKey(imageUri, size);
// Combine caching to implement more elegant deduplication logic, ensure that only
// one thread executes the logic of create at the same time, and there is no global lock
// restriction
return ongoingTasks.computeIfAbsent(cacheKey, k -> doGenerate(imageUri, size)
// In the case of concurrency, doGenerate must return the same instance
.cache()
.doFinally(signalType -> ongoingTasks.remove(cacheKey)));
}
record CacheKey(URI imageUri, ThumbnailSize size) {
}
private Mono<URI> doGenerate(URI imageUri, ThumbnailSize size) {
var imageUrlOpt = toImageUrl(imageUri);
if (imageUrlOpt.isEmpty()) {
return Mono.empty();
@ -91,7 +108,7 @@ public class ThumbnailServiceImpl implements ThumbnailService {
return Optional.empty();
}
Mono<URI> create(URL imageUrl, ThumbnailSize size) {
protected Mono<URI> create(URL imageUrl, ThumbnailSize size) {
var context = ThumbnailContext.builder()
.imageUrl(imageUrl)
.size(size)
@ -112,8 +129,12 @@ public class ThumbnailServiceImpl implements ThumbnailService {
.setImageUri(imageUri.toASCIIString())
.setImageSignature(signatureFor(imageUri))
);
return client.create(thumb)
.thenReturn(uri);
// double check
return fetchThumbnail(imageUri, size)
.map(thumbnail -> URI.create(thumbnail.getSpec().getThumbnailUri()))
.switchIfEmpty(Mono.defer(() -> client.create(thumb)
.thenReturn(uri))
);
});
}

View File

@ -607,6 +607,8 @@ public class SchemeInitializer implements ApplicationListener<ApplicationContext
schemeManager.register(PolicyTemplate.class);
schemeManager.register(Thumbnail.class, indexSpec -> {
indexSpec.add(new IndexSpec()
// see run.halo.app.core.attachment.ThumbnailMigration
// .setUnique(true)
.setName(Thumbnail.ID_INDEX)
.setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
);
@ -614,7 +616,8 @@ public class SchemeInitializer implements ApplicationListener<ApplicationContext
schemeManager.register(LocalThumbnail.class, indexSpec -> {
// make sure image and size are unique
indexSpec.add(new IndexSpec()
.setUnique(true)
// see run.halo.app.core.attachment.ThumbnailMigration
// .setUnique(true)
.setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
.setIndexFunc(simpleAttribute(LocalThumbnail.class,
LocalThumbnail::uniqueImageAndSize)

View File

@ -7,6 +7,7 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static run.halo.app.extension.index.query.QueryFactory.equal;
@ -15,6 +16,11 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
@ -116,26 +122,28 @@ class ThumbnailServiceImplTest {
.thenReturn(insiteUri);
when(client.create(any())).thenReturn(Mono.empty());
when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
.thenReturn(Mono.empty());
thumbnailService.create(url, ThumbnailSize.M)
.as(StepVerifier::create)
.expectNext(thumbUri)
.verifyComplete();
when(client.listBy(eq(Thumbnail.class), any(), isA(PageRequest.class)))
.thenReturn(Mono.empty());
thumbnailService.fetchThumbnail(url.toURI(), ThumbnailSize.M)
.as(StepVerifier::create)
.verifyComplete();
var hash = ThumbnailSigner.generateSignature(insiteUri.toString());
verify(client).listBy(eq(Thumbnail.class), assertArg(options -> {
var exceptOptions = ListOptions.builder()
.fieldQuery(equal(Thumbnail.ID_INDEX,
Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
))
.build();
assertThat(options.toString()).isEqualTo(exceptOptions.toString());
}), isA(PageRequest.class));
verify(client, times(2)).listBy(eq(Thumbnail.class),
assertArg(options -> {
var exceptOptions = ListOptions.builder()
.fieldQuery(equal(Thumbnail.ID_INDEX,
Thumbnail.idIndexFunc(hash, ThumbnailSize.M.name())
))
.build();
assertThat(options.toString()).isEqualTo(exceptOptions.toString());
}), isA(PageRequest.class));
verify(localThumbnailProvider).generate(any());
@ -169,4 +177,49 @@ class ThumbnailServiceImplTest {
.as(StepVerifier::create)
.verifyComplete();
}
@Nested
class ThumbnailGenerateConcurrencyTest {
@Test
public void concurrentThumbnailGeneration() throws InterruptedException {
var spyThumbnailService = spy(thumbnailService);
URI imageUri = URI.create("http://localhost:8090/test.jpg");
doReturn(Mono.empty()).when(spyThumbnailService).fetchThumbnail(eq(imageUri), any());
var createdUri = URI.create("/test-thumb.jpg");
doReturn(Mono.just(createdUri)).when(spyThumbnailService).create(any(), any());
int threadCount = 10;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
var latch = new CountDownLatch(threadCount);
var results = new ConcurrentLinkedQueue<Mono<URI>>();
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
results.add(spyThumbnailService.generate(imageUri, ThumbnailSize.M));
} finally {
latch.countDown();
}
});
}
latch.await();
results.forEach(result -> {
StepVerifier.create(result)
.expectNext(createdUri)
.verifyComplete();
});
verify(spyThumbnailService).fetchThumbnail(eq(imageUri), eq(ThumbnailSize.M));
verify(spyThumbnailService).create(any(), eq(ThumbnailSize.M));
executor.shutdown();
}
}
}