使用自定义线程池执行定时任务,避免程序OOM

pull/170/head
dqjdda 2019-10-31 18:20:56 +08:00
parent cc1a4c5695
commit 4b355067eb
7 changed files with 175 additions and 7 deletions

View File

@ -0,0 +1,58 @@
package me.zhengjie.config.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 线
* @author https://juejin.im/entry/5abb8f6951882555677e9da2
* @date 2019103115:06:18
*/
@Slf4j
@Configuration
public class AsyncTaskExecutePool implements AsyncConfigurer {
//注入配置类
private final AsyncTaskProperties config;
public AsyncTaskExecutePool(AsyncTaskProperties config) {
this.config = config;
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心线程池大小
executor.setCorePoolSize(config.getCorePoolSize());
//最大线程数
executor.setMaxPoolSize(config.getMaxPoolSize());
//队列容量
executor.setQueueCapacity(config.getQueueCapacity());
//活跃时间
executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
//线程名字前缀
executor.setThreadNamePrefix("el-executor-");
// setRejectedExecutionHandler当pool已经达到max size的时候如何处理新任务
// CallerRunsPolicy不在新线程中执行任务而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncUncaughtExceptionHandler() {
@Override
public void handleUncaughtException(Throwable throwable, Method method, Object... objects) {
log.error("=========================="+throwable.getMessage()+"=======================", throwable);
log.error("exception method:"+method.getName());
}
};
}
}

View File

@ -0,0 +1,24 @@
package me.zhengjie.config.thread;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 线
* @author https://juejin.im/entry/5abb8f6951882555677e9da2
* @date 2019103114:58:18
*/
@Data
@Component
@ConfigurationProperties(prefix = "task.pool")
public class AsyncTaskProperties {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
}

View File

@ -0,0 +1,48 @@
package me.zhengjie.config.thread;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线
* @author Zheng Jie
* @date 2019103117:49:55
*/
@Component
public class TheadFactoryName implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public TheadFactoryName() {
this("el-pool");
}
private TheadFactoryName(String name){
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
//此时namePrefix就是 name + 第几个用这个工厂创建线程池的
this.namePrefix = name +
poolNumber.getAndIncrement();
}
@Override
public Thread newThread(Runnable r) {
//此时线程的名字 就是 namePrefix + -thread- + 这个线程池中第几个执行的线程
Thread t = new Thread(group, r,
namePrefix + "-thread-"+threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}

View File

@ -0,0 +1,27 @@
package me.zhengjie.config.thread;
import me.zhengjie.utils.SpringContextHolder;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
* @author Zheng Jie
* @date 2019103118:16:47
*/
public class ThreadPoolExecutorUtil {
public static ThreadPoolExecutor getPoll(){
AsyncTaskProperties properties = SpringContextHolder.getBean(AsyncTaskProperties.class);
return new ThreadPoolExecutor(
properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveSeconds(),
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(properties.getQueueCapacity()),
new TheadFactoryName()
);
}
}

View File

@ -11,7 +11,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TestTask {
public void run(){
log.info("执行成功");
}

View File

@ -1,5 +1,7 @@
package me.zhengjie.modules.quartz.utils;
import me.zhengjie.config.thread.TheadFactoryName;
import me.zhengjie.config.thread.ThreadPoolExecutorUtil;
import me.zhengjie.modules.quartz.domain.QuartzJob;
import me.zhengjie.modules.quartz.domain.QuartzLog;
import me.zhengjie.modules.quartz.repository.QuartzLogRepository;
@ -11,9 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.*;
/**
* https://gitee.com/renrenio/renren-security
@ -25,8 +25,8 @@ public class ExecutionJob extends QuartzJobBean {
private Logger logger = LoggerFactory.getLogger(this.getClass());
// 建议自定义线程池实现方式,该处仅供参考
private ExecutorService executorService = Executors.newSingleThreadExecutor();
// 该处仅供参考
private final static ThreadPoolExecutor executor = ThreadPoolExecutorUtil.getPoll();
@Override
@SuppressWarnings("unchecked")
@ -48,7 +48,7 @@ public class ExecutionJob extends QuartzJobBean {
logger.info("任务准备执行,任务名称:{}", quartzJob.getJobName());
QuartzRunnable task = new QuartzRunnable(quartzJob.getBeanName(), quartzJob.getMethodName(),
quartzJob.getParams());
Future<?> future = executorService.submit(task);
Future<?> future = executor.submit(task);
future.get();
long times = System.currentTimeMillis() - startTime;
log.setTime(times);

View File

@ -29,6 +29,17 @@ spring:
#连接超时时间
timeout: 5000
task:
pool:
# 核心线程池大小
core-pool-size: 10
# 最大线程数
max-pool-size: 30
# 活跃时间
keep-alive-seconds: 60
# 队列容量
queue-capacity: 50
#七牛云
qiniu:
# 文件大小 /M