消息发送优化

pull/63/MERGE
awenes 2023-10-02 20:20:44 +08:00
parent 73c940111d
commit 578117466a
5 changed files with 9 additions and 390 deletions

View File

@ -23,8 +23,7 @@ import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.context.SecurityContext;
@ -37,7 +36,6 @@ import com.google.common.collect.Maps;
import cn.topiam.employee.audit.entity.*;
import cn.topiam.employee.audit.enums.EventStatus;
import cn.topiam.employee.audit.event.type.EventType;
import cn.topiam.employee.audit.mq.AuditMessagePublisher;
import cn.topiam.employee.support.context.ServletContextHelp;
import cn.topiam.employee.support.geo.GeoLocationService;
import cn.topiam.employee.support.security.authentication.WebAuthenticationDetails;
@ -62,8 +60,6 @@ import static cn.topiam.employee.support.util.StringUtils.replaceBlank;
@AllArgsConstructor
public class AuditEventPublish {
private final Logger logger = LoggerFactory.getLogger(AuditEventPublish.class);
/**
*
*
@ -84,7 +80,7 @@ public class AuditEventPublish {
//封装操作人
Actor actor = getActor();
//Publish AuditEvent
auditMessagePublisher.sendAuditChangeMessage(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, null));
applicationEventPublisher.publishEvent(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, null));
//@formatter:on
}
@ -115,7 +111,7 @@ public class AuditEventPublish {
//封装操作人
Actor actor = getActor(authentication);
//Publish AuditEvent
auditMessagePublisher.sendAuditChangeMessage(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, targets));
applicationEventPublisher.publishEvent(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, targets));
//@formatter:on
}
@ -137,7 +133,7 @@ public class AuditEventPublish {
//封装用户代理
UserAgent userAgent = getUserAgent();
//Publish AuditEvent
auditMessagePublisher.sendAuditChangeMessage(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, null));
applicationEventPublisher.publishEvent(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, null));
//@formatter:on
}
@ -178,7 +174,7 @@ public class AuditEventPublish {
actor = getActor();
}
//Publish AuditEvent
auditMessagePublisher.sendAuditChangeMessage(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, target));
applicationEventPublisher.publishEvent(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, target));
//@formatter:on
}
@ -204,7 +200,7 @@ public class AuditEventPublish {
//封装操作人
Actor actor = getActor();
//Publish AuditEvent
auditMessagePublisher.sendAuditChangeMessage(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, target));
applicationEventPublisher.publishEvent(new AuditEvent(TraceUtils.get(), ServletContextHelp.getSession().getId(), actor, event, userAgent, geoLocationModal, target));
//@formatter:on
}
@ -384,13 +380,13 @@ public class AuditEventPublish {
}
/**
* AuditMessagePublisher
* ApplicationEventPublisher
*/
private final AuditMessagePublisher auditMessagePublisher;
private final ApplicationEventPublisher applicationEventPublisher;
/**
*
*/
private final GeoLocationService geoLocationService;
private final GeoLocationService geoLocationService;
}

View File

@ -1,137 +0,0 @@
/*
* eiam-audit - Employee Identity and Access Management
* Copyright © 2022-Present Jinan Yuanchuang Network Technology Co., Ltd. (support@topiam.cn)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.topiam.employee.audit.mq;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import cn.topiam.employee.audit.entity.*;
import cn.topiam.employee.audit.event.AuditEvent;
import cn.topiam.employee.audit.repository.AuditElasticSearchRepository;
import cn.topiam.employee.audit.repository.AuditRepository;
import cn.topiam.employee.support.exception.TopIamException;
import cn.topiam.employee.support.trace.TraceUtils;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import static cn.topiam.employee.audit.mq.AuditMessagePublisher.AUDIT_TOPIC;
import static cn.topiam.employee.support.trace.TraceAspect.TRACE_ID;
/**
*
*
* @author TopIAM
* Created by support@topiam.cn on 2023/5/30 23:12
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AuditMessageListener {
/**
*
*
* @param message {@link Message}
* @param channel {@link Channel}
*/
@SneakyThrows
@RabbitListener(queues = AUDIT_TOPIC, ackMode = "MANUAL")
@RabbitHandler()
public String onMessage(Message message, Channel channel, @Payload AuditEvent auditEvent,
@Headers Map<String, Object> headers) throws TopIamException {
try {
//设置TraceId
TraceUtils.put(String.valueOf(headers.get(TRACE_ID)));
log.info("接收审计事件入参: [{}]", message);
Event event = auditEvent.getEvent();
Actor actor = auditEvent.getActor();
List<Target> target = auditEvent.getTargets();
GeoLocation geoLocation = auditEvent.getGeoLocation();
UserAgent userAgent = auditEvent.getUserAgent();
//保存数据库
AuditEntity entity = new AuditEntity();
Optional<AuditEntity> optional = auditRepository
.findByRequestId(auditEvent.getRequestId());
if (optional.isEmpty()) {
entity.setRequestId(auditEvent.getRequestId());
entity.setSessionId(auditEvent.getSessionId());
//事件
entity.setEventType(event.getType());
entity.setEventContent(event.getContent());
entity.setEventParam(event.getParam());
entity.setEventStatus(event.getStatus());
entity.setEventResult(event.getResult());
entity.setEventTime(event.getTime());
//操作目标
entity.setTargets(target);
entity.setGeoLocation(geoLocation);
entity.setUserAgent(userAgent);
entity.setActorId(actor.getId());
entity.setActorType(actor.getType());
auditRepository.save(entity);
} else {
entity = optional.get();
}
if (!Objects.isNull(entity.getId())) {
//保存 Elasticsearch
AuditElasticSearchEntity audit = AuditElasticSearchEntity.builder().build();
audit.setRequestId(auditEvent.getRequestId());
audit.setSessionId(auditEvent.getSessionId());
audit.setId(entity.getId().toString());
audit.setEvent(event);
audit.setTargets(target);
audit.setGeoLocation(geoLocation);
audit.setUserAgent(userAgent);
audit.setActor(actor);
auditElasticSearchRepository.save(audit);
}
log.info("处理审计事件成功:[{}]", message.getMessageProperties().getDeliveryTag());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
return "处理审计事件成功";
} catch (Exception e) {
log.error("处理审计事件出现异常: MessageProperties: [{}], 审计内容:[{}]",
message.getMessageProperties(), JSONObject.toJSONString(auditEvent), e);
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
return "处理审计事件失败";
}
}
/**
* AuditRepository
*/
private final AuditRepository auditRepository;
/**
* AuditElasticSearchRepository
*/
private final AuditElasticSearchRepository auditElasticSearchRepository;
}

View File

@ -1,85 +0,0 @@
/*
* eiam-audit - Employee Identity and Access Management
* Copyright © 2022-Present Jinan Yuanchuang Network Technology Co., Ltd. (support@topiam.cn)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.topiam.employee.audit.mq;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitConverterFuture;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson2.JSONObject;
import cn.topiam.employee.audit.event.AuditEvent;
import cn.topiam.employee.support.trace.TraceUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import jakarta.annotation.PostConstruct;
import static cn.topiam.employee.support.trace.TraceAspect.TRACE_ID;
/**
* MQ
*
* @author TopIAM
* Created by support@topiam.cn on 2023/5/30 23:12
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class AuditMessagePublisher {
private final AsyncRabbitTemplate asyncRabbitTemplate;
private final AmqpAdmin amqpAdmin;
public final static String AUDIT_TOPIC = "audit";
@PostConstruct
public void init() {
TopicExchange topicExchange = new TopicExchange(AUDIT_TOPIC);
Queue queue = new Queue(AUDIT_TOPIC, true);
amqpAdmin.declareExchange(topicExchange);
amqpAdmin.declareQueue(queue);
amqpAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(AUDIT_TOPIC));
}
/**
*
*
* @param data {@link String}
*/
public void sendAuditChangeMessage(AuditEvent data) {
String traceId = TraceUtils.get();
log.info("发送审计消息, 审计内容:[{}]", JSONObject.toJSONString(data));
RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive(
AUDIT_TOPIC, AUDIT_TOPIC,
MessageBuilder.withPayload(data).setHeader(TRACE_ID, traceId).build());
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("发送审计消息成功,处理结果为:[{}]", result);
} else {
log.error("发送审计消息异常,审计内容:[{}]", JSONObject.toJSONString(data), ex);
}
});
}
}

View File

@ -1,58 +0,0 @@
/*
* eiam-core - Employee Identity and Access Management
* Copyright © 2022-Present Jinan Yuanchuang Network Technology Co., Ltd. (support@topiam.cn)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.topiam.employee.core.mq;
import java.io.IOException;
import java.util.Map;
import org.springframework.amqp.core.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import com.rabbitmq.client.Channel;
import cn.topiam.employee.support.trace.TraceUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static cn.topiam.employee.support.trace.TraceAspect.TRACE_ID;
/**
*
*
* @author TopIAM
* Created by support@topiam.cn on 2023/5/30 23:12
*/
@Slf4j
@AllArgsConstructor
public abstract class AbstractMessageListener {
/**
*
*
* @param message {@link Message}
* @param channel {@link Channel}
* @param body {@link String}
* @param headers {@link Map}
*/
public void onMessage(Message message, Channel channel, @Payload String body,
@Headers Map<String, Object> headers) throws IOException {
// 设置TraceId
TraceUtils.put(String.valueOf(headers.get(TRACE_ID)));
}
}

View File

@ -1,97 +0,0 @@
/*
* eiam-core - Employee Identity and Access Management
* Copyright © 2022-Present Jinan Yuanchuang Network Technology Co., Ltd. (support@topiam.cn)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package cn.topiam.employee.core.mq;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.RabbitConverterFuture;
import org.springframework.messaging.support.MessageBuilder;
import cn.topiam.employee.support.constant.EiamConstants;
import cn.topiam.employee.support.trace.TraceUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static cn.topiam.employee.support.trace.TraceAspect.TRACE_ID;
/**
*
*
* @author TopIAM
* Created by support@topiam.cn on 2023/6/15 23:12
*/
@Slf4j
@RequiredArgsConstructor
public abstract class AbstractMessagePublisher {
/**
*
*/
public final static String USER = "user";
/**
*
*/
public final static String USER_SAVE = USER + EiamConstants.POINT + "save";
/**
*
*/
public final static String USER_DELETE = USER + EiamConstants.POINT + "delete";
/**
* /
*/
public final static String NOTICE = "notice";
/**
*
*/
public final static String NOTICE_SMS = NOTICE + EiamConstants.POINT + "sms";
/**
*
*/
public final static String NOTICE_MAIL = NOTICE + EiamConstants.POINT + "mail";
/**
* AsyncRabbitTemplate
*/
private final AsyncRabbitTemplate asyncRabbitTemplate;
/**
* AmqpAdmin
*/
protected final AmqpAdmin amqpAdmin;
/**
* RocketMQ
*
* @param exchange {@link String}
* @param routingKey {@link String}
* @param message {@link String}
*/
public RabbitConverterFuture<Object> sendMessage(String exchange, String routingKey,
String message) {
// 获取traceId放入消息keys属性中
String traceId = TraceUtils.get();
RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive(exchange,
routingKey, MessageBuilder.withPayload(message).setHeader(TRACE_ID, traceId).build());
return future;
}
}