mirror of https://gitee.com/y_project/RuoYi.git
Pre Merge pull request !494 from 吾日三省Java/master-traceid
commit
365960cf97
|
@ -0,0 +1,96 @@
|
|||
/*
|
||||
* All content copyright Terracotta, Inc., unless otherwise indicated. All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||
* use this file except in compliance with the License. You may obtain a copy
|
||||
* of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*
|
||||
*/
|
||||
package com.ruoyi.web.core.config;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
|
||||
import org.quartz.Calendar;
|
||||
import org.quartz.JobDetail;
|
||||
import org.quartz.Scheduler;
|
||||
import org.quartz.Trigger;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.flyway.FlywayAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.liquibase.LiquibaseAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.quartz.QuartzProperties;
|
||||
import org.springframework.boot.autoconfigure.quartz.SchedulerFactoryBeanCustomizer;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
|
||||
import org.springframework.scheduling.quartz.SpringBeanJobFactory;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
/**
|
||||
* 复制于源码org.springframework.boot.autoconfigure.quartz.QuartzAutoConfiguration
|
||||
*
|
||||
* {@link EnableAutoConfiguration Auto-configuration} for Quartz Scheduler.
|
||||
*
|
||||
* @author Vedran Pavic
|
||||
* @author Stephane Nicoll
|
||||
* @since 2.0.0
|
||||
*/
|
||||
@Configuration(proxyBeanMethods = false)
|
||||
@ConditionalOnClass({ Scheduler.class, SchedulerFactoryBean.class, PlatformTransactionManager.class })
|
||||
@EnableConfigurationProperties(QuartzProperties.class)
|
||||
@AutoConfigureAfter({ DataSourceAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
|
||||
LiquibaseAutoConfiguration.class, FlywayAutoConfiguration.class })
|
||||
public class QuartzAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public SchedulerFactoryBean quartzScheduler(QuartzProperties properties,
|
||||
ObjectProvider<SchedulerFactoryBeanCustomizer> customizers, ObjectProvider<JobDetail> jobDetails,
|
||||
Map<String, Calendar> calendars, ObjectProvider<Trigger> triggers, ApplicationContext applicationContext,
|
||||
ThreadPoolTaskExecutor threadPoolTaskExecutor) {
|
||||
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
|
||||
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
|
||||
jobFactory.setApplicationContext(applicationContext);
|
||||
schedulerFactoryBean.setJobFactory(jobFactory);
|
||||
if (properties.getSchedulerName() != null) {
|
||||
schedulerFactoryBean.setSchedulerName(properties.getSchedulerName());
|
||||
}
|
||||
schedulerFactoryBean.setAutoStartup(properties.isAutoStartup());
|
||||
schedulerFactoryBean.setStartupDelay((int) properties.getStartupDelay().getSeconds());
|
||||
schedulerFactoryBean.setWaitForJobsToCompleteOnShutdown(properties.isWaitForJobsToCompleteOnShutdown());
|
||||
schedulerFactoryBean.setOverwriteExistingJobs(properties.isOverwriteExistingJobs());
|
||||
if (!properties.getProperties().isEmpty()) {
|
||||
schedulerFactoryBean.setQuartzProperties(asProperties(properties.getProperties()));
|
||||
}
|
||||
schedulerFactoryBean.setJobDetails(jobDetails.orderedStream().toArray(JobDetail[]::new));
|
||||
schedulerFactoryBean.setCalendars(calendars);
|
||||
schedulerFactoryBean.setTriggers(triggers.orderedStream().toArray(Trigger[]::new));
|
||||
schedulerFactoryBean.setTaskExecutor(threadPoolTaskExecutor); // 源码中增加了这行代码,使用自定义线程池执行任务
|
||||
customizers.orderedStream().forEach((customizer) -> customizer.customize(schedulerFactoryBean));
|
||||
return schedulerFactoryBean;
|
||||
}
|
||||
|
||||
private Properties asProperties(Map<String, String> source) {
|
||||
Properties properties = new Properties();
|
||||
properties.putAll(source);
|
||||
return properties;
|
||||
}
|
||||
|
||||
}
|
|
@ -3,7 +3,7 @@
|
|||
<!-- 日志存放路径 -->
|
||||
<property name="log.path" value="/home/ruoyi/logs" />
|
||||
<!-- 日志输出格式 -->
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
|
||||
<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%X{trace-id}] [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
|
||||
|
||||
<!-- 控制台输出 -->
|
||||
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
|
||||
|
|
|
@ -1,12 +1,15 @@
|
|||
package com.ruoyi.common.config.thread;
|
||||
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import com.ruoyi.common.config.thread.threadpool.CustomScheduledThreadPoolExecutor;
|
||||
import com.ruoyi.common.config.thread.threadpool.CustomThreadPoolTaskExecutor;
|
||||
import com.ruoyi.common.utils.Threads;
|
||||
|
||||
/**
|
||||
|
@ -32,7 +35,7 @@ public class ThreadPoolConfig
|
|||
@Bean(name = "threadPoolTaskExecutor")
|
||||
public ThreadPoolTaskExecutor threadPoolTaskExecutor()
|
||||
{
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
ThreadPoolTaskExecutor executor = new CustomThreadPoolTaskExecutor();
|
||||
executor.setMaxPoolSize(maxPoolSize);
|
||||
executor.setCorePoolSize(corePoolSize);
|
||||
executor.setQueueCapacity(queueCapacity);
|
||||
|
@ -48,7 +51,7 @@ public class ThreadPoolConfig
|
|||
@Bean(name = "scheduledExecutorService")
|
||||
protected ScheduledExecutorService scheduledExecutorService()
|
||||
{
|
||||
return new ScheduledThreadPoolExecutor(corePoolSize,
|
||||
return new CustomScheduledThreadPoolExecutor(corePoolSize,
|
||||
new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy())
|
||||
{
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
package com.ruoyi.common.config.thread.threadpool;
|
||||
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.ruoyi.common.utils.MdcUtil;
|
||||
|
||||
/**
|
||||
* 自定义Schedule线程池
|
||||
*
|
||||
* @author JQ棣
|
||||
*/
|
||||
public class CustomScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
|
||||
private static final Logger log = LoggerFactory.getLogger(CustomScheduledThreadPoolExecutor.class);
|
||||
|
||||
public CustomScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, handler);
|
||||
}
|
||||
|
||||
public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory,
|
||||
RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, threadFactory, handler);
|
||||
}
|
||||
|
||||
public CustomScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) {
|
||||
super(corePoolSize, threadFactory);
|
||||
}
|
||||
|
||||
public CustomScheduledThreadPoolExecutor(int corePoolSize) {
|
||||
super(corePoolSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
|
||||
*/
|
||||
private ThreadLocal<Long> timetl = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
protected void beforeExecute(Thread t, Runnable r) {
|
||||
timetl.set(System.currentTimeMillis());
|
||||
super.beforeExecute(t, r);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecute(Runnable r, Throwable t) {
|
||||
super.afterExecute(r, t);
|
||||
Long start = timetl.get();
|
||||
timetl.remove();
|
||||
long diff = System.currentTimeMillis() - start;
|
||||
// 统计任务耗时、初始线程数、正在执行的任务数量、 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数
|
||||
log.info("duration:{} ms,poolSize:{},active:{},completedTaskCount:{},taskCount:{},queue:{},largestPoolSize:{}",
|
||||
diff, this.getPoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(),
|
||||
this.getQueue().size(), this.getLargestPoolSize());
|
||||
MdcUtil.remove();
|
||||
}
|
||||
|
||||
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
|
||||
return super.schedule(new WrappedRunnable(command), delay, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
|
||||
return super.scheduleAtFixedRate(new WrappedRunnable(command), initialDelay, period, unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
|
||||
return super.scheduleWithFixedDelay(new WrappedRunnable(command), initialDelay, delay, unit);
|
||||
}
|
||||
|
||||
private static class WrappedRunnable implements Runnable {
|
||||
private final Runnable target;
|
||||
private final String traceId;
|
||||
|
||||
public WrappedRunnable(Runnable target) {
|
||||
this.target = target;
|
||||
this.traceId = MdcUtil.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
MdcUtil.put(traceId);
|
||||
target.run();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,103 @@
|
|||
package com.ruoyi.common.config.thread.threadpool;
|
||||
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.ruoyi.common.utils.MdcUtil;
|
||||
|
||||
/**
|
||||
* 自定义线程池
|
||||
*
|
||||
* @author JQ棣
|
||||
*/
|
||||
public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
|
||||
private static final Logger log = LoggerFactory.getLogger(CustomThreadPoolExecutor.class);
|
||||
|
||||
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
||||
BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
|
||||
}
|
||||
|
||||
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
||||
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
|
||||
}
|
||||
|
||||
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
||||
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
|
||||
}
|
||||
|
||||
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
|
||||
BlockingQueue<Runnable> workQueue) {
|
||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间
|
||||
*/
|
||||
private ThreadLocal<Long> timetl = new ThreadLocal<>();
|
||||
|
||||
@Override
|
||||
protected void beforeExecute(Thread t, Runnable r) {
|
||||
timetl.set(System.currentTimeMillis());
|
||||
super.beforeExecute(t, r);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecute(Runnable r, Throwable t) {
|
||||
super.afterExecute(r, t);
|
||||
Long start = timetl.get();
|
||||
timetl.remove();
|
||||
long diff = System.currentTimeMillis() - start;
|
||||
// 统计任务耗时、初始线程数、正在执行的任务数量、 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数
|
||||
log.info("duration:{} ms,poolSize:{},active:{},completedTaskCount:{},taskCount:{},queue:{},largestPoolSize:{}",
|
||||
diff, this.getPoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(),
|
||||
this.getQueue().size(), this.getLargestPoolSize());
|
||||
MdcUtil.remove();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
super.execute(new WrappedRunnable(command));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
return super.submit(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Runnable task, T result) {
|
||||
return super.submit(task, result);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
return super.submit(task);
|
||||
}
|
||||
|
||||
private static class WrappedRunnable implements Runnable {
|
||||
private final Runnable target;
|
||||
private final String traceId;
|
||||
|
||||
public WrappedRunnable(Runnable target) {
|
||||
this.target = target;
|
||||
this.traceId = MdcUtil.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
MdcUtil.put(traceId);
|
||||
target.run();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,345 @@
|
|||
package com.ruoyi.common.config.thread.threadpool;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.RejectedExecutionHandler;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.springframework.core.task.TaskDecorator;
|
||||
import org.springframework.core.task.TaskRejectedException;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.ConcurrentReferenceHashMap;
|
||||
import org.springframework.util.concurrent.ListenableFuture;
|
||||
import org.springframework.util.concurrent.ListenableFutureTask;
|
||||
|
||||
/**
|
||||
* ThreadPoolTaskExecutor源码
|
||||
* <h4>只是将new ThreadPoolExecutor(...)改为new CustomThreadPoolExecutor(...)</h4>
|
||||
*
|
||||
* @author JQ棣
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
public class CustomThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
|
||||
|
||||
private final Object poolSizeMonitor = new Object();
|
||||
|
||||
private int corePoolSize = 1;
|
||||
|
||||
private int maxPoolSize = Integer.MAX_VALUE;
|
||||
|
||||
private int keepAliveSeconds = 60;
|
||||
|
||||
private int queueCapacity = Integer.MAX_VALUE;
|
||||
|
||||
private boolean allowCoreThreadTimeOut = false;
|
||||
|
||||
@Nullable
|
||||
private TaskDecorator taskDecorator;
|
||||
|
||||
@Nullable
|
||||
private ThreadPoolExecutor threadPoolExecutor;
|
||||
|
||||
// Runnable decorator to user-level FutureTask, if different
|
||||
private final Map<Runnable, Object> decoratedTaskMap = new ConcurrentReferenceHashMap<>(16,
|
||||
ConcurrentReferenceHashMap.ReferenceType.WEAK);
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's core pool size. Default is 1.
|
||||
* <p>
|
||||
* <b>This setting can be modified at runtime, for example through JMX.</b>
|
||||
*/
|
||||
public void setCorePoolSize(int corePoolSize) {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
this.corePoolSize = corePoolSize;
|
||||
if (this.threadPoolExecutor != null) {
|
||||
this.threadPoolExecutor.setCorePoolSize(corePoolSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the ThreadPoolExecutor's core pool size.
|
||||
*/
|
||||
public int getCorePoolSize() {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
return this.corePoolSize;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's maximum pool size. Default is
|
||||
* {@code Integer.MAX_VALUE}.
|
||||
* <p>
|
||||
* <b>This setting can be modified at runtime, for example through JMX.</b>
|
||||
*/
|
||||
public void setMaxPoolSize(int maxPoolSize) {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
if (this.threadPoolExecutor != null) {
|
||||
this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the ThreadPoolExecutor's maximum pool size.
|
||||
*/
|
||||
public int getMaxPoolSize() {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
return this.maxPoolSize;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the ThreadPoolExecutor's keep-alive seconds. Default is 60.
|
||||
* <p>
|
||||
* <b>This setting can be modified at runtime, for example through JMX.</b>
|
||||
*/
|
||||
public void setKeepAliveSeconds(int keepAliveSeconds) {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
this.keepAliveSeconds = keepAliveSeconds;
|
||||
if (this.threadPoolExecutor != null) {
|
||||
this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the ThreadPoolExecutor's keep-alive seconds.
|
||||
*/
|
||||
public int getKeepAliveSeconds() {
|
||||
synchronized (this.poolSizeMonitor) {
|
||||
return this.keepAliveSeconds;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the capacity for the ThreadPoolExecutor's BlockingQueue. Default is
|
||||
* {@code Integer.MAX_VALUE}.
|
||||
* <p>
|
||||
* Any positive value will lead to a LinkedBlockingQueue instance; any other
|
||||
* value will lead to a SynchronousQueue instance.
|
||||
*
|
||||
* @see java.util.concurrent.LinkedBlockingQueue
|
||||
* @see java.util.concurrent.SynchronousQueue
|
||||
*/
|
||||
public void setQueueCapacity(int queueCapacity) {
|
||||
this.queueCapacity = queueCapacity;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify whether to allow core threads to time out. This enables dynamic
|
||||
* growing and shrinking even in combination with a non-zero queue (since
|
||||
* the max pool size will only grow once the queue is full).
|
||||
* <p>
|
||||
* Default is "false".
|
||||
*
|
||||
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
|
||||
*/
|
||||
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
|
||||
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specify a custom {@link TaskDecorator} to be applied to any
|
||||
* {@link Runnable} about to be executed.
|
||||
* <p>
|
||||
* Note that such a decorator is not necessarily being applied to the
|
||||
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
|
||||
* execution callback (which may be a wrapper around the user-supplied
|
||||
* task).
|
||||
* <p>
|
||||
* The primary use case is to set some execution context around the task's
|
||||
* invocation, or to provide some monitoring/statistics for task execution.
|
||||
*
|
||||
* @since 4.3
|
||||
*/
|
||||
public void setTaskDecorator(TaskDecorator taskDecorator) {
|
||||
this.taskDecorator = taskDecorator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Note: This method exposes an {@link ExecutorService} to its base class
|
||||
* but stores the actual {@link ThreadPoolExecutor} handle internally. Do
|
||||
* not override this method for replacing the executor, rather just for
|
||||
* decorating its {@code ExecutorService} handle or storing custom state.
|
||||
*/
|
||||
@Override
|
||||
protected ExecutorService initializeExecutor(ThreadFactory threadFactory,
|
||||
RejectedExecutionHandler rejectedExecutionHandler) {
|
||||
|
||||
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
|
||||
|
||||
ThreadPoolExecutor executor;
|
||||
if (this.taskDecorator != null) {
|
||||
executor = new CustomThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds,
|
||||
TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) {
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
Runnable decorated = taskDecorator.decorate(command);
|
||||
if (decorated != command) {
|
||||
decoratedTaskMap.put(decorated, command);
|
||||
}
|
||||
super.execute(decorated);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
executor = new CustomThreadPoolExecutor(this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds,
|
||||
TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
|
||||
|
||||
}
|
||||
|
||||
if (this.allowCoreThreadTimeOut) {
|
||||
executor.allowCoreThreadTimeOut(true);
|
||||
}
|
||||
|
||||
this.threadPoolExecutor = executor;
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the BlockingQueue to use for the ThreadPoolExecutor.
|
||||
* <p>
|
||||
* A LinkedBlockingQueue instance will be created for a positive capacity
|
||||
* value; a SynchronousQueue else.
|
||||
*
|
||||
* @param queueCapacity
|
||||
* the specified queue capacity
|
||||
* @return the BlockingQueue instance
|
||||
* @see java.util.concurrent.LinkedBlockingQueue
|
||||
* @see java.util.concurrent.SynchronousQueue
|
||||
*/
|
||||
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
|
||||
if (queueCapacity > 0) {
|
||||
return new LinkedBlockingQueue<>(queueCapacity);
|
||||
} else {
|
||||
return new SynchronousQueue<>();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the underlying ThreadPoolExecutor for native access.
|
||||
*
|
||||
* @return the underlying ThreadPoolExecutor (never {@code null})
|
||||
* @throws IllegalStateException
|
||||
* if the ThreadPoolTaskExecutor hasn't been initialized yet
|
||||
*/
|
||||
public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
|
||||
Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
|
||||
return this.threadPoolExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the current pool size.
|
||||
*
|
||||
* @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
|
||||
*/
|
||||
public int getPoolSize() {
|
||||
if (this.threadPoolExecutor == null) {
|
||||
// Not initialized yet: assume core pool size.
|
||||
return this.corePoolSize;
|
||||
}
|
||||
return this.threadPoolExecutor.getPoolSize();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of currently active threads.
|
||||
*
|
||||
* @see java.util.concurrent.ThreadPoolExecutor#getActiveCount()
|
||||
*/
|
||||
public int getActiveCount() {
|
||||
if (this.threadPoolExecutor == null) {
|
||||
// Not initialized yet: assume no active threads.
|
||||
return 0;
|
||||
}
|
||||
return this.threadPoolExecutor.getActiveCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable task) {
|
||||
Executor executor = getThreadPoolExecutor();
|
||||
try {
|
||||
executor.execute(task);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Runnable task, long startTimeout) {
|
||||
execute(task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> submit(Runnable task) {
|
||||
ExecutorService executor = getThreadPoolExecutor();
|
||||
try {
|
||||
return executor.submit(task);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Future<T> submit(Callable<T> task) {
|
||||
ExecutorService executor = getThreadPoolExecutor();
|
||||
try {
|
||||
return executor.submit(task);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ListenableFuture<?> submitListenable(Runnable task) {
|
||||
ExecutorService executor = getThreadPoolExecutor();
|
||||
try {
|
||||
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
|
||||
executor.execute(future);
|
||||
return future;
|
||||
} catch (RejectedExecutionException ex) {
|
||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
|
||||
ExecutorService executor = getThreadPoolExecutor();
|
||||
try {
|
||||
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
|
||||
executor.execute(future);
|
||||
return future;
|
||||
} catch (RejectedExecutionException ex) {
|
||||
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cancelRemainingTask(Runnable task) {
|
||||
super.cancelRemainingTask(task);
|
||||
// Cancel associated user-level Future handle as well
|
||||
Object original = this.decoratedTaskMap.get(task);
|
||||
if (original instanceof Future) {
|
||||
((Future<?>) original).cancel(true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This task executor prefers short-lived work units.
|
||||
*/
|
||||
@Override
|
||||
public boolean prefersShortLivedTasks() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.ruoyi.common.utils;
|
||||
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.slf4j.MDC;
|
||||
|
||||
/**
|
||||
* MDC工具类
|
||||
*
|
||||
* @author JQ棣
|
||||
*/
|
||||
public class MdcUtil {
|
||||
|
||||
public final static String UNIQUE_KEY = "trace-id";
|
||||
|
||||
/**
|
||||
* 1.Random是线程安全的<br/>
|
||||
* 2.高并发情况下,单实例的性能不如每个线程持有一个实例<br/>
|
||||
* 3.经粗略测试,并发数少于200情况下性能是单实例优,按需来说项目目前并发量在200内<br/>
|
||||
*/
|
||||
private static Random random = new Random();
|
||||
|
||||
public static void put() {
|
||||
// 可修改traceid生成算法
|
||||
String traceid = "" + System.currentTimeMillis() + (1000 + random.nextInt(9000));
|
||||
MDC.put(UNIQUE_KEY, traceid);
|
||||
}
|
||||
|
||||
public static void put(String traceId) {
|
||||
if (StringUtils.isNotEmpty(traceId)) {
|
||||
MDC.put(UNIQUE_KEY, traceId);
|
||||
return;
|
||||
}
|
||||
put();
|
||||
}
|
||||
|
||||
public static String get() {
|
||||
return MDC.get(UNIQUE_KEY);
|
||||
}
|
||||
|
||||
public static void remove() {
|
||||
MDC.remove(UNIQUE_KEY);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.ruoyi.framework.filter;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.filter.OncePerRequestFilter;
|
||||
|
||||
import com.ruoyi.common.utils.MdcUtil;
|
||||
|
||||
/**
|
||||
* 对客户端请求添加MDC
|
||||
*
|
||||
* @author JQ棣
|
||||
*/
|
||||
@Component
|
||||
@Order(Ordered.HIGHEST_PRECEDENCE)
|
||||
public class MdcFilter extends OncePerRequestFilter {
|
||||
|
||||
static {
|
||||
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldNotFilter(HttpServletRequest request) throws ServletException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain chain)
|
||||
throws IOException, ServletException {
|
||||
String traceId = request.getHeader(MdcUtil.UNIQUE_KEY);
|
||||
MdcUtil.put(traceId);
|
||||
try {
|
||||
chain.doFilter(request, response);
|
||||
} finally {
|
||||
MdcUtil.remove();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue