mirror of https://github.com/halo-dev/halo
Remove deprecated migrations (#7535)
#### What type of PR is this? /kind cleanup /area core /milestone 2.21.x #### What this PR does / why we need it: This PR removes deprecated Subscritpion and Thumbnail migrations. #### Does this PR introduce a user-facing change? ```release-note None ```pull/7543/head^2
parent
2cba28d21d
commit
4a3d35a900
|
@ -1,141 +0,0 @@
|
||||||
package run.halo.app.core.attachment;
|
|
||||||
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.time.Duration;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.function.Function;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.boot.ApplicationArguments;
|
|
||||||
import org.springframework.boot.ApplicationRunner;
|
|
||||||
import org.springframework.dao.OptimisticLockingFailureException;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import reactor.core.publisher.Flux;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
import reactor.core.scheduler.Schedulers;
|
|
||||||
import reactor.util.retry.Retry;
|
|
||||||
import run.halo.app.core.attachment.extension.LocalThumbnail;
|
|
||||||
import run.halo.app.core.attachment.extension.Thumbnail;
|
|
||||||
import run.halo.app.extension.Extension;
|
|
||||||
import run.halo.app.extension.ListOptions;
|
|
||||||
import run.halo.app.extension.ReactiveExtensionClient;
|
|
||||||
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* <p>TODO Remove this class in the next major version.</p>
|
|
||||||
* when this class is removed, the following code should be added:
|
|
||||||
* <pre>
|
|
||||||
* <code>
|
|
||||||
* schemeManager.register(LocalThumbnail.class, indexSpec -> {
|
|
||||||
* indexSpec.add(new IndexSpec()
|
|
||||||
* // mark the index as unique
|
|
||||||
* .setUnique(true)
|
|
||||||
* .setName(LocalThumbnail.UNIQUE_IMAGE_AND_SIZE_INDEX)
|
|
||||||
* .setIndexFunc(simpleAttribute(LocalThumbnail.class,
|
|
||||||
* LocalThumbnail::uniqueImageAndSize)
|
|
||||||
* )
|
|
||||||
* );
|
|
||||||
* // ...
|
|
||||||
* });
|
|
||||||
* schemeManager.register(Thumbnail.class, indexSpec -> {
|
|
||||||
* indexSpec.add(new IndexSpec()
|
|
||||||
* // mark the index as unique
|
|
||||||
* .setUnique(true)
|
|
||||||
* .setName(Thumbnail.ID_INDEX)
|
|
||||||
* .setIndexFunc(simpleAttribute(Thumbnail.class, Thumbnail::idIndexFunc))
|
|
||||||
* );
|
|
||||||
* // ...
|
|
||||||
* });
|
|
||||||
* </code>
|
|
||||||
* </pre>
|
|
||||||
*
|
|
||||||
* @see run.halo.app.infra.SchemeInitializer
|
|
||||||
* @since 2.20.9
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class ThumbnailMigration implements ApplicationRunner {
|
|
||||||
private final LocalThumbnailService localThumbnailService;
|
|
||||||
private final ReactiveExtensionClient client;
|
|
||||||
private final ReactiveExtensionPaginatedOperator extensionPaginatedOperator;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
|
||||||
cleanupThumbnail(Thumbnail.class,
|
|
||||||
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
|
|
||||||
thumbnail.getSpec().getSize().name()))
|
|
||||||
.count()
|
|
||||||
.doOnNext(count -> log.info("Deleted {} duplicate thumbnail records", count))
|
|
||||||
.block();
|
|
||||||
|
|
||||||
cleanupThumbnail(LocalThumbnail.class,
|
|
||||||
thumbnail -> new UniqueKey(thumbnail.getSpec().getImageUri(),
|
|
||||||
thumbnail.getSpec().getSize().name()))
|
|
||||||
.flatMap(thumb -> {
|
|
||||||
var filePath = localThumbnailService.toFilePath(thumb.getSpec().getFilePath());
|
|
||||||
return deleteFile(filePath).thenReturn(thumb.getMetadata().getName());
|
|
||||||
})
|
|
||||||
.count()
|
|
||||||
.doOnNext(count -> log.info("Deleted {} duplicate local thumbnail records.", count))
|
|
||||||
.block();
|
|
||||||
log.info("Duplicate thumbnails have been cleaned up.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private Mono<Void> deleteFile(Path path) {
|
|
||||||
return Mono.fromRunnable(
|
|
||||||
() -> {
|
|
||||||
try {
|
|
||||||
Files.deleteIfExists(path);
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Ignore
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.subscribeOn(Schedulers.boundedElastic())
|
|
||||||
.then();
|
|
||||||
}
|
|
||||||
|
|
||||||
private <T extends Extension> Flux<T> cleanupThumbnail(Class<T> thumbClass,
|
|
||||||
Function<T, UniqueKey> keyFunction) {
|
|
||||||
var unique = new HashSet<UniqueKey>();
|
|
||||||
var duplicateThumbs = new ArrayList<T>();
|
|
||||||
|
|
||||||
var collectDuplicateMono = extensionPaginatedOperator.list(thumbClass, new ListOptions())
|
|
||||||
.doOnNext(thumbnail -> {
|
|
||||||
var key = keyFunction.apply(thumbnail);
|
|
||||||
if (unique.contains(key)) {
|
|
||||||
duplicateThumbs.add(thumbnail);
|
|
||||||
} else {
|
|
||||||
unique.add(key);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.then();
|
|
||||||
|
|
||||||
return Mono.when(collectDuplicateMono)
|
|
||||||
.thenMany(Flux.fromIterable(duplicateThumbs)
|
|
||||||
.flatMap(this::deleteThumbnail)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
private <T extends Extension> Mono<T> deleteThumbnail(T thumbnail) {
|
|
||||||
return client.delete(thumbnail)
|
|
||||||
.onErrorResume(OptimisticLockingFailureException.class,
|
|
||||||
e -> deleteThumbnail((Class<T>) thumbnail.getClass(),
|
|
||||||
thumbnail.getMetadata().getName())
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
private <T extends Extension> Mono<T> deleteThumbnail(Class<T> clazz, String name) {
|
|
||||||
return Mono.defer(() -> client.fetch(clazz, name)
|
|
||||||
.flatMap(client::delete)
|
|
||||||
)
|
|
||||||
.retryWhen(Retry.backoff(8, Duration.ofMillis(100))
|
|
||||||
.filter(OptimisticLockingFailureException.class::isInstance));
|
|
||||||
}
|
|
||||||
|
|
||||||
record UniqueKey(String imageUri, String size) {
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,152 +0,0 @@
|
||||||
package run.halo.app.notification;
|
|
||||||
|
|
||||||
import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_PAGE;
|
|
||||||
import static run.halo.app.content.NotificationReasonConst.NEW_COMMENT_ON_POST;
|
|
||||||
import static run.halo.app.content.NotificationReasonConst.SOMEONE_REPLIED_TO_YOU;
|
|
||||||
import static run.halo.app.extension.index.query.QueryFactory.and;
|
|
||||||
import static run.halo.app.extension.index.query.QueryFactory.equal;
|
|
||||||
import static run.halo.app.extension.index.query.QueryFactory.in;
|
|
||||||
import static run.halo.app.extension.index.query.QueryFactory.isNull;
|
|
||||||
import static run.halo.app.extension.index.query.QueryFactory.startsWith;
|
|
||||||
|
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.function.Consumer;
|
|
||||||
import lombok.RequiredArgsConstructor;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.boot.ApplicationArguments;
|
|
||||||
import org.springframework.boot.ApplicationRunner;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import reactor.core.publisher.Mono;
|
|
||||||
import run.halo.app.core.extension.User;
|
|
||||||
import run.halo.app.core.extension.notification.Subscription;
|
|
||||||
import run.halo.app.extension.ListOptions;
|
|
||||||
import run.halo.app.extension.ReactiveExtensionClient;
|
|
||||||
import run.halo.app.extension.router.selector.FieldSelector;
|
|
||||||
import run.halo.app.infra.AnonymousUserConst;
|
|
||||||
import run.halo.app.infra.ReactiveExtensionPaginatedOperator;
|
|
||||||
import run.halo.app.infra.ReactiveExtensionPaginatedOperatorImpl;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Subscription migration to adapt to the new expression subscribe mechanism.
|
|
||||||
*
|
|
||||||
* @author guqing
|
|
||||||
* @since 2.15.0
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@RequiredArgsConstructor
|
|
||||||
public class SubscriptionMigration implements ApplicationRunner {
|
|
||||||
private final NotificationCenter notificationCenter;
|
|
||||||
private final ReactiveExtensionClient client;
|
|
||||||
private final SubscriptionService subscriptionService;
|
|
||||||
private final ReactiveExtensionPaginatedOperator paginatedOperator;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(ApplicationArguments args) throws Exception {
|
|
||||||
handleAnonymousSubscription();
|
|
||||||
cleanupUserSubscription();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cleanupUserSubscription() {
|
|
||||||
var listOptions = new ListOptions();
|
|
||||||
var query = isNull("metadata.deletionTimestamp");
|
|
||||||
listOptions.setFieldSelector(FieldSelector.of(query));
|
|
||||||
var iterator =
|
|
||||||
new ReactiveExtensionPaginatedOperatorImpl(client);
|
|
||||||
iterator.list(User.class, listOptions)
|
|
||||||
.map(user -> user.getMetadata().getName())
|
|
||||||
.flatMap(this::removeInternalSubscriptionForUser)
|
|
||||||
.then()
|
|
||||||
.doOnSuccess(unused -> log.info("Cleanup user subscription completed"))
|
|
||||||
.block();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void handleAnonymousSubscription() {
|
|
||||||
log.debug("Start to collating anonymous subscription...");
|
|
||||||
Set<String> anonymousSubscribers = new HashSet<>();
|
|
||||||
deleteAnonymousSubscription(subscription -> {
|
|
||||||
var name = subscription.getSpec().getSubscriber().getName();
|
|
||||||
anonymousSubscribers.add(name);
|
|
||||||
}).block();
|
|
||||||
if (anonymousSubscribers.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// anonymous only subscribe some-one-replied-to-you reason
|
|
||||||
for (String anonymousSubscriber : anonymousSubscribers) {
|
|
||||||
createSubscription(anonymousSubscriber,
|
|
||||||
SOMEONE_REPLIED_TO_YOU,
|
|
||||||
"props.repliedOwner == '%s'".formatted(anonymousSubscriber)).block();
|
|
||||||
}
|
|
||||||
log.info("Collating anonymous subscription completed.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private Mono<Void> deleteAnonymousSubscription(Consumer<Subscription> consumer) {
|
|
||||||
var listOptions = new ListOptions();
|
|
||||||
var query = and(startsWith("spec.subscriber", AnonymousUserConst.PRINCIPAL),
|
|
||||||
isNull("spec.reason.expression"),
|
|
||||||
isNull("metadata.deletionTimestamp"),
|
|
||||||
in("spec.reason.reasonType", Set.of(NEW_COMMENT_ON_POST,
|
|
||||||
NEW_COMMENT_ON_PAGE,
|
|
||||||
SOMEONE_REPLIED_TO_YOU))
|
|
||||||
);
|
|
||||||
listOptions.setFieldSelector(FieldSelector.of(query));
|
|
||||||
return paginatedOperator.deleteInitialBatch(Subscription.class, listOptions)
|
|
||||||
.doOnNext(consumer)
|
|
||||||
.doOnNext(subscription -> log.debug("Deleted anonymous subscription: {}",
|
|
||||||
subscription.getMetadata().getName())
|
|
||||||
)
|
|
||||||
.then();
|
|
||||||
}
|
|
||||||
|
|
||||||
private Mono<Void> removeInternalSubscriptionForUser(String username) {
|
|
||||||
log.debug("Start to collating internal subscription for user: {}", username);
|
|
||||||
var subscriber = new Subscription.Subscriber();
|
|
||||||
subscriber.setName(username);
|
|
||||||
|
|
||||||
var listOptions = new ListOptions();
|
|
||||||
var fieldQuery = and(isNull("metadata.deletionTimestamp"),
|
|
||||||
equal("spec.subscriber", subscriber.toString()),
|
|
||||||
in("spec.reason.reasonType", Set.of(
|
|
||||||
NEW_COMMENT_ON_POST,
|
|
||||||
NEW_COMMENT_ON_PAGE,
|
|
||||||
SOMEONE_REPLIED_TO_YOU
|
|
||||||
))
|
|
||||||
);
|
|
||||||
listOptions.setFieldSelector(FieldSelector.of(fieldQuery));
|
|
||||||
|
|
||||||
return subscriptionService.removeBy(listOptions)
|
|
||||||
.map(subscription -> {
|
|
||||||
var name = subscription.getSpec().getSubscriber().getName();
|
|
||||||
var reason = subscription.getSpec().getReason();
|
|
||||||
String expression = switch (reason.getReasonType()) {
|
|
||||||
case NEW_COMMENT_ON_POST -> "props.postOwner == '%s'".formatted(name);
|
|
||||||
case NEW_COMMENT_ON_PAGE -> "props.pageOwner == '%s'".formatted(name);
|
|
||||||
case SOMEONE_REPLIED_TO_YOU -> "props.repliedOwner == '%s'".formatted(name);
|
|
||||||
// never happen
|
|
||||||
default -> null;
|
|
||||||
};
|
|
||||||
return new SubscriptionSummary(name, reason.getReasonType(), expression);
|
|
||||||
})
|
|
||||||
.distinct()
|
|
||||||
.flatMap(summary -> createSubscription(summary.subscriber(), summary.reasonType(),
|
|
||||||
summary.expression()))
|
|
||||||
.then()
|
|
||||||
.doOnSuccess(unused ->
|
|
||||||
log.debug("Collating internal subscription for user: {} completed", username));
|
|
||||||
}
|
|
||||||
|
|
||||||
Mono<Void> createSubscription(String name, String reasonType, String expression) {
|
|
||||||
var interestReason = new Subscription.InterestReason();
|
|
||||||
interestReason.setReasonType(reasonType);
|
|
||||||
interestReason.setExpression(expression);
|
|
||||||
var subscriber = new Subscription.Subscriber();
|
|
||||||
subscriber.setName(name);
|
|
||||||
log.debug("Create subscription for user: {} with reasonType: {}", name, reasonType);
|
|
||||||
return notificationCenter.subscribe(subscriber, interestReason).then();
|
|
||||||
}
|
|
||||||
|
|
||||||
record SubscriptionSummary(String subscriber, String reasonType, String expression) {
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue