From cf0d29557a8d035c81dd8ac0892d8d26d2a3ba75 Mon Sep 17 00:00:00 2001 From: zhangdaiscott Date: Tue, 9 Mar 2021 18:27:34 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E5=86=99xxljob=E9=81=BF=E5=85=8D?= =?UTF-8?q?=E9=BB=98=E8=AE=A4=209999=E7=AB=AF=E5=8F=A3=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xxl/job/core/executor/XxlJobExecutor.java | 228 ++++++++++++++++++ 1 file changed, 228 insertions(+) create mode 100644 jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java diff --git a/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java new file mode 100644 index 00000000..d6cf990f --- /dev/null +++ b/jeecg-boot/jeecg-boot-starter/jeecg-boot-starter-job/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -0,0 +1,228 @@ +package com.xxl.job.core.executor; + +import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.core.biz.client.AdminBizClient; +import com.xxl.job.core.handler.IJobHandler; +import com.xxl.job.core.log.XxlJobFileAppender; +import com.xxl.job.core.server.EmbedServer; +import com.xxl.job.core.thread.JobLogFileCleanThread; +import com.xxl.job.core.thread.JobThread; +import com.xxl.job.core.thread.TriggerCallbackThread; +import com.xxl.job.core.util.IpUtil; +import com.xxl.job.core.util.NetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * 重写目的修改默认端口9999为10000避免和网关冲突 + */ +public class XxlJobExecutor { + private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class); + + // ---------------------- param ---------------------- + private String adminAddresses; + private String accessToken; + private String appname; + private String address; + private String ip; + private int port; + private String logPath; + private int logRetentionDays; + + public void setAdminAddresses(String adminAddresses) { + this.adminAddresses = adminAddresses; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public void setAppname(String appname) { + this.appname = appname; + } + + public void setAddress(String address) { + this.address = address; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public void setPort(int port) { + this.port = port; + } + + public void setLogPath(String logPath) { + this.logPath = logPath; + } + + public void setLogRetentionDays(int logRetentionDays) { + this.logRetentionDays = logRetentionDays; + } + + + // ---------------------- start + stop ---------------------- + public void start() throws Exception { + + // init logpath + XxlJobFileAppender.initLogPath(logPath); + + // init invoker, admin-client + initAdminBizList(adminAddresses, accessToken); + + + // init JobLogFileCleanThread + JobLogFileCleanThread.getInstance().start(logRetentionDays); + + // init TriggerCallbackThread + TriggerCallbackThread.getInstance().start(); + + // init executor-server + initEmbedServer(address, ip, port, appname, accessToken); + } + + public void destroy() { + // destory executor-server + stopEmbedServer(); + + // destory jobThreadRepository + if (jobThreadRepository.size() > 0) { + for (Map.Entry item : jobThreadRepository.entrySet()) { + JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job."); + // wait for job thread push result to callback queue + if (oldJobThread != null) { + try { + oldJobThread.join(); + } catch (InterruptedException e) { + logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e); + } + } + } + jobThreadRepository.clear(); + } + jobHandlerRepository.clear(); + + + // destory JobLogFileCleanThread + JobLogFileCleanThread.getInstance().toStop(); + + // destory TriggerCallbackThread + TriggerCallbackThread.getInstance().toStop(); + + } + + + // ---------------------- admin-client (rpc invoker) ---------------------- + private static List adminBizList; + + private void initAdminBizList(String adminAddresses, String accessToken) throws Exception { + if (adminAddresses != null && adminAddresses.trim().length() > 0) { + for (String address : adminAddresses.trim().split(",")) { + if (address != null && address.trim().length() > 0) { + + AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken); + + if (adminBizList == null) { + adminBizList = new ArrayList(); + } + adminBizList.add(adminBiz); + } + } + } + } + + public static List getAdminBizList() { + return adminBizList; + } + + // ---------------------- executor-server (rpc provider) ---------------------- + private EmbedServer embedServer = null; + + private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception { + + // fill ip port 修改默认端口 + port = port > 0 ? port : NetUtil.findAvailablePort(10000); + ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp(); + + // generate address + if (address == null || address.trim().length() == 0) { + String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null + address = "http://{ip_port}/".replace("{ip_port}", ip_port_address); + } + + // accessToken + if (accessToken == null || accessToken.trim().length() == 0) { + logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken."); + } + + // start + embedServer = new EmbedServer(); + embedServer.start(address, port, appname, accessToken); + } + + private void stopEmbedServer() { + // stop provider factory + if (embedServer != null) { + try { + embedServer.stop(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } + + + // ---------------------- job handler repository ---------------------- + private static ConcurrentMap jobHandlerRepository = new ConcurrentHashMap(); + + public static IJobHandler loadJobHandler(String name) { + return jobHandlerRepository.get(name); + } + + public static IJobHandler registJobHandler(String name, IJobHandler jobHandler) { + logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler); + return jobHandlerRepository.put(name, jobHandler); + } + + + // ---------------------- job thread repository ---------------------- + private static ConcurrentMap jobThreadRepository = new ConcurrentHashMap(); + + public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason) { + JobThread newJobThread = new JobThread(jobId, handler); + newJobThread.start(); + logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); + + JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } + + return newJobThread; + } + + public static JobThread removeJobThread(int jobId, String removeOldReason) { + JobThread oldJobThread = jobThreadRepository.remove(jobId); + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + + return oldJobThread; + } + return null; + } + + public static JobThread loadJobThread(int jobId) { + JobThread jobThread = jobThreadRepository.get(jobId); + return jobThread; + } + +}