refactor: add read-write lock to ExtensionContextRegistry (#4245)

#### What type of PR is this?
/kind improvement
/area core
/area plugin
/milestone 2.8.x

#### What this PR does / why we need it:
修复由于多线程环境下导致的插件卸载时的路由异常问题

改动描述:
为了确保在多线程环境下访问 ExtensionContextRegistry 类的注册表时的线程安全。通过添加读写锁,可以保证在读取和写入PluginApplicationContext 时只有一个线程可以访问,从而避免了多个线程同时访问注册表时可能出现的竞态条件和数据不一致的问题。同时,更新了 register、remove、getByPluginId、containsContext 和 getPluginApplicationContexts 方法,以在访问注册表时获取和释放适当的锁,从而确保了线程安全。

问题原因:
当插件卸载时,卸载动作在 Reconciler 线程中执行而路由访问是在 reactor 的 NonBlockingThread 线程执行,当 PluginCompositeRouterFunction 的 routerFunctions() 方法从 ExtensionContextRegistry 中获取所有 PluginApplicationContext 并持有还未处理完成时由于 PluginReconciler 中执行了卸载插件逻辑而将某个 PluginApplicationContext 关闭从而让 PluginCompositeRouterFunction 中持有到的对象引用发生变化出现数据不一致问题导致出现 `PluginApplicationContext@14971c8e has been closed already` 异常。

解决方案:
所以此修改让读取和写入PluginApplicationContext 时只有一个线程可以访问来解决此问题

how to test it?
测试开发模式下卸载插件时是否会出现如 #4242 中所描述的异常信息
#### Which issue(s) this PR fixes:
Fixes #4242

#### Does this PR introduce a user-facing change?
```release-note
修复由于多线程环境下导致的插件卸载时的路由异常问题
```
pull/4210/head^2
guqing 2023-07-21 11:38:14 +08:00 committed by GitHub
parent 832c86071a
commit 3b03ed9570
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 41 deletions

View File

@ -1,9 +1,11 @@
package run.halo.app.plugin;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.springframework.lang.NonNull;
/**
* <p>Plugin application context registrar.</p>
@ -18,7 +20,8 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExtensionContextRegistry {
private static final ExtensionContextRegistry INSTANCE = new ExtensionContextRegistry();
private final Map<String, PluginApplicationContext> registry = new ConcurrentHashMap<>();
private final Map<String, PluginApplicationContext> registry = new HashMap<>();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public static ExtensionContextRegistry getInstance() {
return INSTANCE;
@ -27,35 +30,101 @@ public class ExtensionContextRegistry {
private ExtensionContextRegistry() {
}
public void register(String pluginId, PluginApplicationContext context) {
registry.put(pluginId, context);
/**
* Acquire the read lock when using getPluginApplicationContexts and getByPluginId.
*/
public void acquireReadLock() {
this.readWriteLock.readLock().lock();
}
public PluginApplicationContext remove(String pluginId) {
return registry.remove(pluginId);
/**
* Release the read lock after using getPluginApplicationContexts and getByPluginId.
*/
public void releaseReadLock() {
this.readWriteLock.readLock().unlock();
}
/**
* Register plugin application context to registry map.
*
* @param pluginId plugin id(name)
* @param context plugin application context
*/
public void register(String pluginId, PluginApplicationContext context) {
this.readWriteLock.writeLock().lock();
try {
registry.put(pluginId, context);
} finally {
this.readWriteLock.writeLock().unlock();
}
}
/**
* Remove plugin application context from registry map.
*
* @param pluginId plugin id
*/
public void remove(String pluginId) {
this.readWriteLock.writeLock().lock();
try {
PluginApplicationContext removed = registry.remove(pluginId);
if (removed != null) {
removed.close();
}
} finally {
this.readWriteLock.writeLock().unlock();
}
}
/**
* Gets plugin application context by plugin id from registry map.
* Note: ensure call {@link #containsContext(String)} after call this method.
*
* @param pluginId plugin id
* @return plugin application context
* @throws IllegalArgumentException if plugin id not found in registry
*/
@NonNull
public PluginApplicationContext getByPluginId(String pluginId) {
PluginApplicationContext context = registry.get(pluginId);
if (context == null) {
throw new IllegalArgumentException(
String.format("The plugin [%s] can not be found.", pluginId));
this.readWriteLock.readLock().lock();
try {
PluginApplicationContext context = registry.get(pluginId);
if (context == null) {
throw new IllegalArgumentException(
String.format("The plugin [%s] can not be found.", pluginId));
}
return context;
} finally {
this.readWriteLock.readLock().unlock();
}
return context;
}
/**
* Check whether the registry contains the plugin application context by plugin id.
*
* @param pluginId plugin id
* @return true if contains, otherwise false
*/
public boolean containsContext(String pluginId) {
return registry.containsKey(pluginId);
this.readWriteLock.readLock().lock();
try {
return registry.containsKey(pluginId);
} finally {
this.readWriteLock.readLock().unlock();
}
}
/**
* Gets all plugin application contexts from registry map.
*
* @return plugin application contexts
*/
public List<PluginApplicationContext> getPluginApplicationContexts() {
return new ArrayList<>(registry.values());
this.readWriteLock.readLock().lock();
try {
return new ArrayList<>(registry.values());
} finally {
this.readWriteLock.readLock().unlock();
}
}
}

View File

@ -27,12 +27,17 @@ public class PluginApplicationEventBridgeDispatcher
if (!isSharedEventAnnotationPresent(event.getClass())) {
return;
}
List<PluginApplicationContext> pluginApplicationContexts =
ExtensionContextRegistry.getInstance().getPluginApplicationContexts();
for (PluginApplicationContext pluginApplicationContext : pluginApplicationContexts) {
log.debug("Bridging broadcast event [{}] to plugin [{}]", event,
pluginApplicationContext.getPluginId());
pluginApplicationContext.publishEvent(event);
ExtensionContextRegistry.getInstance().acquireReadLock();
try {
List<PluginApplicationContext> pluginApplicationContexts =
ExtensionContextRegistry.getInstance().getPluginApplicationContexts();
for (PluginApplicationContext pluginApplicationContext : pluginApplicationContexts) {
log.debug("Bridging broadcast event [{}] to plugin [{}]", event,
pluginApplicationContext.getPluginId());
pluginApplicationContext.publishEvent(event);
}
} finally {
ExtensionContextRegistry.getInstance().releaseReadLock();
}
}

View File

@ -153,10 +153,7 @@ public class PluginApplicationInitializer {
public void contextDestroyed(String pluginId) {
Assert.notNull(pluginId, "pluginId must not be null");
PluginApplicationContext removed = contextRegistry.remove(pluginId);
if (removed != null) {
removed.close();
}
contextRegistry.remove(pluginId);
}
private Set<Class<?>> findCandidateComponents(String pluginId) {

View File

@ -4,6 +4,7 @@ import static run.halo.app.plugin.ExtensionContextRegistry.getInstance;
import java.util.ArrayList;
import java.util.List;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.HandlerFunction;
@ -48,27 +49,35 @@ public class PluginCompositeRouterFunction implements RouterFunction<ServerRespo
@SuppressWarnings("unchecked")
private List<RouterFunction<ServerResponse>> routerFunctions() {
var rawRouterFunctions = getInstance().getPluginApplicationContexts()
.stream()
.flatMap(applicationContext -> applicationContext
.getBeanProvider(RouterFunction.class)
.orderedStream())
.map(router -> (RouterFunction<ServerResponse>) router)
.toList();
var reverseProxies = reverseProxyRouterFunctionFactory.getRouterFunctions();
getInstance().acquireReadLock();
try {
List<PluginApplicationContext> contexts = getInstance().getPluginApplicationContexts()
.stream()
.filter(AbstractApplicationContext::isActive)
.toList();
var rawRouterFunctions = contexts
.stream()
.flatMap(applicationContext -> applicationContext
.getBeanProvider(RouterFunction.class)
.orderedStream())
.map(router -> (RouterFunction<ServerResponse>) router)
.toList();
var reverseProxies = reverseProxyRouterFunctionFactory.getRouterFunctions();
var endpointBuilder = new CustomEndpointsBuilder();
getInstance().getPluginApplicationContexts()
.forEach(context -> context.getBeanProvider(CustomEndpoint.class)
var endpointBuilder = new CustomEndpointsBuilder();
contexts.forEach(context -> context.getBeanProvider(CustomEndpoint.class)
.orderedStream()
.forEach(endpointBuilder::add));
var customEndpoint = endpointBuilder.build();
var customEndpoint = endpointBuilder.build();
List<RouterFunction<ServerResponse>> routerFunctions =
new ArrayList<>(rawRouterFunctions.size() + reverseProxies.size() + 1);
routerFunctions.addAll(rawRouterFunctions);
routerFunctions.addAll(reverseProxies);
routerFunctions.add(customEndpoint);
return routerFunctions;
List<RouterFunction<ServerResponse>> routerFunctions =
new ArrayList<>(rawRouterFunctions.size() + reverseProxies.size() + 1);
routerFunctions.addAll(rawRouterFunctions);
routerFunctions.addAll(reverseProxies);
routerFunctions.add(customEndpoint);
return routerFunctions;
} finally {
getInstance().releaseReadLock();
}
}
}

View File

@ -55,6 +55,7 @@ class PluginCompositeRouterFunctionTest {
@SuppressWarnings("unchecked")
void setUp() {
var fakeContext = mock(PluginApplicationContext.class);
when(fakeContext.isActive()).thenReturn(true);
ExtensionContextRegistry.getInstance().register("fake-plugin", fakeContext);
when(rawRouterFunctionsProvider.orderedStream()).thenReturn(Stream.empty());