diff --git a/ruoyi-admin/src/main/java/com/ruoyi/web/core/config/QuartzAutoConfiguration.java b/ruoyi-admin/src/main/java/com/ruoyi/web/core/config/QuartzAutoConfiguration.java new file mode 100644 index 000000000..6e815b9be --- /dev/null +++ b/ruoyi-admin/src/main/java/com/ruoyi/web/core/config/QuartzAutoConfiguration.java @@ -0,0 +1,96 @@ +/* + * All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy + * of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + */ +package com.ruoyi.web.core.config; + +import java.util.Map; +import java.util.Properties; + +import org.quartz.Calendar; +import org.quartz.JobDetail; +import org.quartz.Scheduler; +import org.quartz.Trigger; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration; +import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; +import org.springframework.boot.autoconfigure.quartz.QuartzProperties; +import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.scheduling.quartz.SchedulerFactoryBean; +import org.springframework.scheduling.quartz.SpringBeanJobFactory; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * 复制于源码org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration + * + * {@link EnableAutoConfiguration Auto-configuration} for Quartz Scheduler. + * + * @author Vedran Pavic + * @author Stephane Nicoll + * @since 2.0.0 + */ +@Configuration(proxyBeanMethods = false) +@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class, PlatformTransactionManager.class }) +@EnableConfigurationProperties(QuartzProperties.class) +@AutoConfigureAfter({ DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class, + LiquibaseAutoConfiguration.class, FlywayAutoConfiguration.class }) +public class QuartzAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + public SchedulerFactoryBean quartzScheduler(QuartzProperties properties, + ObjectProvider customizers, ObjectProvider jobDetails, + Map calendars, ObjectProvider triggers, ApplicationContext applicationContext, + ThreadPoolTaskExecutor threadPoolTaskExecutor) { + SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean(); + SpringBeanJobFactory jobFactory = new SpringBeanJobFactory(); + jobFactory.setApplicationContext(applicationContext); + schedulerFactoryBean.setJobFactory(jobFactory); + if (properties.getSchedulerName() != null) { + schedulerFactoryBean.setSchedulerName(properties.getSchedulerName()); + } + schedulerFactoryBean.setAutoStartup(properties.isAutoStartup()); + schedulerFactoryBean.setStartupDelay((int) properties.getStartupDelay().getSeconds()); + schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(properties.isWaitForJobsToCompleteOnShutdown()); + schedulerFactoryBean.setOverwriteExistingJobs(properties.isOverwriteExistingJobs()); + if (!properties.getProperties().isEmpty()) { + schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties())); + } + schedulerFactoryBean.setJobDetails(jobDetails.orderedStream().toArray(JobDetail[]::new)); + schedulerFactoryBean.setCalendars(calendars); + schedulerFactoryBean.setTriggers(triggers.orderedStream().toArray(Trigger[]::new)); + schedulerFactoryBean.setTaskExecutor(threadPoolTaskExecutor); // 源码中增加了这行代码,使用自定义线程池执行任务 + customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean)); + return schedulerFactoryBean; + } + + private Properties asProperties(Map source) { + Properties properties = new Properties(); + properties.putAll(source); + return properties; + } + +} diff --git a/ruoyi-admin/src/main/resources/logback.xml b/ruoyi-admin/src/main/resources/logback.xml index d69a57207..5def002bd 100644 --- a/ruoyi-admin/src/main/resources/logback.xml +++ b/ruoyi-admin/src/main/resources/logback.xml @@ -3,7 +3,7 @@ - + diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java index 8a4c7ce2a..1b635ea94 100644 --- a/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java +++ b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/ThreadPoolConfig.java @@ -1,12 +1,15 @@ package com.ruoyi.common.config.thread; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; + import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import com.ruoyi.common.config.thread.threadpool.CustomScheduledThreadPoolExecutor; +import com.ruoyi.common.config.thread.threadpool.CustomThreadPoolTaskExecutor; import com.ruoyi.common.utils.Threads; /** @@ -32,7 +35,7 @@ public class ThreadPoolConfig @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + ThreadPoolTaskExecutor executor = new CustomThreadPoolTaskExecutor(); executor.setMaxPoolSize(maxPoolSize); executor.setCorePoolSize(corePoolSize); executor.setQueueCapacity(queueCapacity); @@ -48,7 +51,7 @@ public class ThreadPoolConfig @Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { - return new ScheduledThreadPoolExecutor(corePoolSize, + return new CustomScheduledThreadPoolExecutor(corePoolSize, new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(), new ThreadPoolExecutor.CallerRunsPolicy()) { diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomScheduledThreadPoolExecutor.java b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomScheduledThreadPoolExecutor.java new file mode 100644 index 000000000..48bcc211f --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomScheduledThreadPoolExecutor.java @@ -0,0 +1,92 @@ +package com.ruoyi.common.config.thread.threadpool; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ruoyi.common.utils.MdcUtil; + +/** + * 自定义Schedule线程池 + * + * @author JQ棣 + */ +public class CustomScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { + private static final Logger log = LoggerFactory.getLogger(CustomScheduledThreadPoolExecutor.class); + + public CustomScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { + super(corePoolSize, handler); + } + + public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, threadFactory, handler); + } + + public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { + super(corePoolSize, threadFactory); + } + + public CustomScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize); + } + + /** + * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间 + */ + private ThreadLocal timetl = new ThreadLocal<>(); + + @Override + protected void beforeExecute(Thread t, Runnable r) { + timetl.set(System.currentTimeMillis()); + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + Long start = timetl.get(); + timetl.remove(); + long diff = System.currentTimeMillis() - start; + // 统计任务耗时、初始线程数、正在执行的任务数量、 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数 + log.info("duration:{} ms,poolSize:{},active:{},completedTaskCount:{},taskCount:{},queue:{},largestPoolSize:{}", + diff, this.getPoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), + this.getQueue().size(), this.getLargestPoolSize()); + MdcUtil.remove(); + } + + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return super.schedule(new WrappedRunnable(command), delay, unit); + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { + return super.scheduleAtFixedRate(new WrappedRunnable(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + return super.scheduleWithFixedDelay(new WrappedRunnable(command), initialDelay, delay, unit); + } + + private static class WrappedRunnable implements Runnable { + private final Runnable target; + private final String traceId; + + public WrappedRunnable(Runnable target) { + this.target = target; + this.traceId = MdcUtil.get(); + } + + @Override + public void run() { + MdcUtil.put(traceId); + target.run(); + } + } +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolExecutor.java b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolExecutor.java new file mode 100644 index 000000000..4b18545a1 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolExecutor.java @@ -0,0 +1,103 @@ +package com.ruoyi.common.config.thread.threadpool; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.ruoyi.common.utils.MdcUtil; + +/** + * 自定义线程池 + * + * @author JQ棣 + */ +public class CustomThreadPoolExecutor extends ThreadPoolExecutor { + private static final Logger log = LoggerFactory.getLogger(CustomThreadPoolExecutor.class); + + public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler); + } + + public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + } + + public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue, ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + } + + public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, + BlockingQueue workQueue) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); + } + + /** + * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间 + */ + private ThreadLocal timetl = new ThreadLocal<>(); + + @Override + protected void beforeExecute(Thread t, Runnable r) { + timetl.set(System.currentTimeMillis()); + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(Runnable r, Throwable t) { + super.afterExecute(r, t); + Long start = timetl.get(); + timetl.remove(); + long diff = System.currentTimeMillis() - start; + // 统计任务耗时、初始线程数、正在执行的任务数量、 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数 + log.info("duration:{} ms,poolSize:{},active:{},completedTaskCount:{},taskCount:{},queue:{},largestPoolSize:{}", + diff, this.getPoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), + this.getQueue().size(), this.getLargestPoolSize()); + MdcUtil.remove(); + } + + @Override + public void execute(Runnable command) { + super.execute(new WrappedRunnable(command)); + } + + @Override + public Future submit(Runnable task) { + return super.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + return super.submit(task, result); + } + + @Override + public Future submit(Callable task) { + return super.submit(task); + } + + private static class WrappedRunnable implements Runnable { + private final Runnable target; + private final String traceId; + + public WrappedRunnable(Runnable target) { + this.target = target; + this.traceId = MdcUtil.get(); + } + + @Override + public void run() { + MdcUtil.put(traceId); + target.run(); + } + } +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolTaskExecutor.java b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolTaskExecutor.java new file mode 100644 index 000000000..fc7ad5586 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/config/thread/threadpool/CustomThreadPoolTaskExecutor.java @@ -0,0 +1,345 @@ +package com.ruoyi.common.config.thread.threadpool; + +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.springframework.core.task.TaskDecorator; +import org.springframework.core.task.TaskRejectedException; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.Assert; +import org.springframework.util.ConcurrentReferenceHashMap; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; + +/** + * ThreadPoolTaskExecutor源码 + *

只是将new ThreadPoolExecutor(...)改为new CustomThreadPoolExecutor(...)

+ * + * @author JQ棣 + */ +@SuppressWarnings("serial") +public class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { + + private final Object poolSizeMonitor = new Object(); + + private int corePoolSize = 1; + + private int maxPoolSize = Integer.MAX_VALUE; + + private int keepAliveSeconds = 60; + + private int queueCapacity = Integer.MAX_VALUE; + + private boolean allowCoreThreadTimeOut = false; + + @Nullable + private TaskDecorator taskDecorator; + + @Nullable + private ThreadPoolExecutor threadPoolExecutor; + + // Runnable decorator to user-level FutureTask, if different + private final Map decoratedTaskMap = new ConcurrentReferenceHashMap<>(16, + ConcurrentReferenceHashMap.ReferenceType.WEAK); + + /** + * Set the ThreadPoolExecutor's core pool size. Default is 1. + *

+ * This setting can be modified at runtime, for example through JMX. + */ + public void setCorePoolSize(int corePoolSize) { + synchronized (this.poolSizeMonitor) { + this.corePoolSize = corePoolSize; + if (this.threadPoolExecutor != null) { + this.threadPoolExecutor.setCorePoolSize(corePoolSize); + } + } + } + + /** + * Return the ThreadPoolExecutor's core pool size. + */ + public int getCorePoolSize() { + synchronized (this.poolSizeMonitor) { + return this.corePoolSize; + } + } + + /** + * Set the ThreadPoolExecutor's maximum pool size. Default is + * {@code Integer.MAX_VALUE}. + *

+ * This setting can be modified at runtime, for example through JMX. + */ + public void setMaxPoolSize(int maxPoolSize) { + synchronized (this.poolSizeMonitor) { + this.maxPoolSize = maxPoolSize; + if (this.threadPoolExecutor != null) { + this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); + } + } + } + + /** + * Return the ThreadPoolExecutor's maximum pool size. + */ + public int getMaxPoolSize() { + synchronized (this.poolSizeMonitor) { + return this.maxPoolSize; + } + } + + /** + * Set the ThreadPoolExecutor's keep-alive seconds. Default is 60. + *

+ * This setting can be modified at runtime, for example through JMX. + */ + public void setKeepAliveSeconds(int keepAliveSeconds) { + synchronized (this.poolSizeMonitor) { + this.keepAliveSeconds = keepAliveSeconds; + if (this.threadPoolExecutor != null) { + this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); + } + } + } + + /** + * Return the ThreadPoolExecutor's keep-alive seconds. + */ + public int getKeepAliveSeconds() { + synchronized (this.poolSizeMonitor) { + return this.keepAliveSeconds; + } + } + + /** + * Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default is + * {@code Integer.MAX_VALUE}. + *

+ * Any positive value will lead to a LinkedBlockingQueue instance; any other + * value will lead to a SynchronousQueue instance. + * + * @see java.util.concurrent.LinkedBlockingQueue + * @see java.util.concurrent.SynchronousQueue + */ + public void setQueueCapacity(int queueCapacity) { + this.queueCapacity = queueCapacity; + } + + /** + * Specify whether to allow core threads to time out. This enables dynamic + * growing and shrinking even in combination with a non-zero queue (since + * the max pool size will only grow once the queue is full). + *

+ * Default is "false". + * + * @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean) + */ + public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { + this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; + } + + /** + * Specify a custom {@link TaskDecorator} to be applied to any + * {@link Runnable} about to be executed. + *

+ * Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied + * task). + *

+ * The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * + * @since 4.3 + */ + public void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + + /** + * Note: This method exposes an {@link ExecutorService} to its base class + * but stores the actual {@link ThreadPoolExecutor} handle internally. Do + * not override this method for replacing the executor, rather just for + * decorating its {@code ExecutorService} handle or storing custom state. + */ + @Override + protected ExecutorService initializeExecutor(ThreadFactory threadFactory, + RejectedExecutionHandler rejectedExecutionHandler) { + + BlockingQueue queue = createQueue(this.queueCapacity); + + ThreadPoolExecutor executor; + if (this.taskDecorator != null) { + executor = new CustomThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, + TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { + @Override + public void execute(Runnable command) { + Runnable decorated = taskDecorator.decorate(command); + if (decorated != command) { + decoratedTaskMap.put(decorated, command); + } + super.execute(decorated); + } + }; + } else { + executor = new CustomThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, + TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); + + } + + if (this.allowCoreThreadTimeOut) { + executor.allowCoreThreadTimeOut(true); + } + + this.threadPoolExecutor = executor; + return executor; + } + + /** + * Create the BlockingQueue to use for the ThreadPoolExecutor. + *

+ * A LinkedBlockingQueue instance will be created for a positive capacity + * value; a SynchronousQueue else. + * + * @param queueCapacity + * the specified queue capacity + * @return the BlockingQueue instance + * @see java.util.concurrent.LinkedBlockingQueue + * @see java.util.concurrent.SynchronousQueue + */ + protected BlockingQueue createQueue(int queueCapacity) { + if (queueCapacity > 0) { + return new LinkedBlockingQueue<>(queueCapacity); + } else { + return new SynchronousQueue<>(); + } + } + + /** + * Return the underlying ThreadPoolExecutor for native access. + * + * @return the underlying ThreadPoolExecutor (never {@code null}) + * @throws IllegalStateException + * if the ThreadPoolTaskExecutor hasn't been initialized yet + */ + public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { + Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); + return this.threadPoolExecutor; + } + + /** + * Return the current pool size. + * + * @see java.util.concurrent.ThreadPoolExecutor#getPoolSize() + */ + public int getPoolSize() { + if (this.threadPoolExecutor == null) { + // Not initialized yet: assume core pool size. + return this.corePoolSize; + } + return this.threadPoolExecutor.getPoolSize(); + } + + /** + * Return the number of currently active threads. + * + * @see java.util.concurrent.ThreadPoolExecutor#getActiveCount() + */ + public int getActiveCount() { + if (this.threadPoolExecutor == null) { + // Not initialized yet: assume no active threads. + return 0; + } + return this.threadPoolExecutor.getActiveCount(); + } + + @Override + public void execute(Runnable task) { + Executor executor = getThreadPoolExecutor(); + try { + executor.execute(task); + } catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public void execute(Runnable task, long startTimeout) { + execute(task); + } + + @Override + public Future submit(Runnable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + return executor.submit(task); + } catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public Future submit(Callable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + return executor.submit(task); + } catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Runnable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask<>(task, null); + executor.execute(future); + return future; + } catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask<>(task); + executor.execute(future); + return future; + } catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + protected void cancelRemainingTask(Runnable task) { + super.cancelRemainingTask(task); + // Cancel associated user-level Future handle as well + Object original = this.decoratedTaskMap.get(task); + if (original instanceof Future) { + ((Future) original).cancel(true); + } + } + + /** + * This task executor prefers short-lived work units. + */ + @Override + public boolean prefersShortLivedTasks() { + return true; + } +} diff --git a/ruoyi-common/src/main/java/com/ruoyi/common/utils/MdcUtil.java b/ruoyi-common/src/main/java/com/ruoyi/common/utils/MdcUtil.java new file mode 100644 index 000000000..7aa00ef06 --- /dev/null +++ b/ruoyi-common/src/main/java/com/ruoyi/common/utils/MdcUtil.java @@ -0,0 +1,45 @@ +package com.ruoyi.common.utils; + +import java.util.Random; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.MDC; + +/** + * MDC工具类 + * + * @author JQ棣 + */ +public class MdcUtil { + + public final static String UNIQUE_KEY = "trace-id"; + + /** + * 1.Random是线程安全的
+ * 2.高并发情况下,单实例的性能不如每个线程持有一个实例
+ * 3.经粗略测试,并发数少于200情况下性能是单实例优,按需来说项目目前并发量在200内
+ */ + private static Random random = new Random(); + + public static void put() { + // 可修改traceid生成算法 + String traceid = "" + System.currentTimeMillis() + (1000 + random.nextInt(9000)); + MDC.put(UNIQUE_KEY, traceid); + } + + public static void put(String traceId) { + if (StringUtils.isNotEmpty(traceId)) { + MDC.put(UNIQUE_KEY, traceId); + return; + } + put(); + } + + public static String get() { + return MDC.get(UNIQUE_KEY); + } + + public static void remove() { + MDC.remove(UNIQUE_KEY); + } +} \ No newline at end of file diff --git a/ruoyi-framework/src/main/java/com/ruoyi/framework/filter/MdcFilter.java b/ruoyi-framework/src/main/java/com/ruoyi/framework/filter/MdcFilter.java new file mode 100644 index 000000000..d21f724d6 --- /dev/null +++ b/ruoyi-framework/src/main/java/com/ruoyi/framework/filter/MdcFilter.java @@ -0,0 +1,46 @@ +package com.ruoyi.framework.filter; + +import java.io.IOException; + +import javax.servlet.FilterChain; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.web.filter.OncePerRequestFilter; + +import com.ruoyi.common.utils.MdcUtil; + +/** + * 对客户端请求添加MDC + * + * @author JQ棣 + */ +@Component +@Order(Ordered.HIGHEST_PRECEDENCE) +public class MdcFilter extends OncePerRequestFilter { + + static { + System.setProperty("log4j2.isThreadContextMapInheritable", "true"); + } + + @Override + protected boolean shouldNotFilter(HttpServletRequest request) throws ServletException { + return false; + } + + @Override + protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain) + throws IOException, ServletException { + String traceId = request.getHeader(MdcUtil.UNIQUE_KEY); + MdcUtil.put(traceId); + try { + chain.doFilter(request, response); + } finally { + MdcUtil.remove(); + } + } +} \ No newline at end of file