Refactor visit event

pull/146/head
johnniang 2019-04-24 23:31:04 +08:00
parent 93eb1250f7
commit 4695868fa7
7 changed files with 195 additions and 161 deletions

View File

@ -0,0 +1,34 @@
package run.halo.app.event.post;
import org.springframework.context.ApplicationEvent;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
/**
* Visit event.
*
* @author johnniang
* @date 19-4-22
*/
public abstract class AbstractVisitEvent extends ApplicationEvent {
private final Integer id;
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
* @param id id
*/
public AbstractVisitEvent(@NonNull Object source, @NonNull Integer id) {
super(source);
Assert.notNull(id, "Id must not be null");
this.id = id;
}
@NonNull
public Integer getId() {
return id;
}
}

View File

@ -1,6 +1,12 @@
package run.halo.app.event.post; package run.halo.app.event.post;
import cn.hutool.core.lang.Assert;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import run.halo.app.service.base.BasePostService;
import java.util.Map;
import java.util.concurrent.*;
/** /**
* Abstract visit event listener. * Abstract visit event listener.
@ -11,48 +17,102 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
public abstract class AbstractVisitEventListener { public abstract class AbstractVisitEventListener {
// private final Map<Integer, BlockingQueue<Integer>> postVisitQueueMap; private final Map<Integer, BlockingQueue<Integer>> postVisitQueueMap;
//
// private final Map<Integer, PostVisitEventListener.PostVisitTask> postVisitTaskMap; private final Map<Integer, PostVisitTask> postVisitTaskMap;
//
// protected final BasePostRepository basePostRepository; private final BasePostService basePostService;
//
// protected AbstractVisitEventListener(BasePostRepository basePostRepository) { private final ExecutorService executor;
// this.basePostRepository = basePostRepository;
// } protected AbstractVisitEventListener(BasePostService basePostService) {
// this.basePostService = basePostService;
//
// /** int initCapacity = 8;
// * Post visit task.
// */ long count = basePostService.count();
// private class PostVisitTask implements Runnable {
// if (count < initCapacity) {
// private final Integer postId; initCapacity = (int) count;
// }
// private PostVisitTask(Integer postId) {
// this.postId = postId; postVisitQueueMap = new ConcurrentHashMap<>(initCapacity << 1);
// } postVisitTaskMap = new ConcurrentHashMap<>(initCapacity << 1);
//
// @Override this.executor = Executors.newCachedThreadPool();
// public void run() { }
// while (!Thread.currentThread().isInterrupted()) {
// try { /**
// BlockingQueue<Integer> postVisitQueue = postVisitQueueMap.get(postId); * Handle visit event.
// Integer postId = postVisitQueue.take(); *
// * @param event visit event must not be null
// log.debug("Took a new visit for post id: [{}]", postId); * @throws InterruptedException
// */
// // Increase the visit protected void handleVisitEvent(@NonNull AbstractVisitEvent event) throws InterruptedException {
// postService.increaseVisit(postId); Assert.notNull(event, "Visit event must not be null");
//
// log.debug("Increased visits for post id: [{}]", postId); // Get post id
// } catch (InterruptedException e) { Integer id = event.getId();
// log.debug("Post visit task: " + Thread.currentThread().getName() + " was interrupted", e);
// // Ignore this exception log.debug("Received a visit event, post id: [{}]", id);
// }
// } // Get post visit queue
// BlockingQueue<Integer> postVisitQueue = postVisitQueueMap.computeIfAbsent(id, this::createEmptyQueue);
// log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
// } postVisitTaskMap.computeIfAbsent(id, this::createPostVisitTask);
// }
// Put a visit for the post
postVisitQueue.put(id);
}
private PostVisitTask createPostVisitTask(Integer postId) {
// Create new post visit task
PostVisitTask postVisitTask = new PostVisitTask(postId);
// Start a post visit task
executor.execute(postVisitTask);
log.debug("Created a new post visit task for post id: [{}]", postId);
return postVisitTask;
}
private BlockingQueue<Integer> createEmptyQueue(Integer postId) {
// Create a new queue
return new LinkedBlockingQueue<>();
}
/**
* Post visit task.
*/
private class PostVisitTask implements Runnable {
private final Integer postId;
private PostVisitTask(Integer postId) {
this.postId = postId;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
BlockingQueue<Integer> postVisitQueue = postVisitQueueMap.get(postId);
Integer postId = postVisitQueue.take();
log.debug("Took a new visit for post id: [{}]", postId);
// Increase the visit
basePostService.increaseVisit(postId);
log.debug("Increased visits for post id: [{}]", postId);
} catch (InterruptedException e) {
log.debug("Post visit task: " + Thread.currentThread().getName() + " was interrupted", e);
// Ignore this exception
}
}
log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
}
}
} }

View File

@ -1,34 +1,20 @@
package run.halo.app.event.post; package run.halo.app.event.post;
import org.springframework.context.ApplicationEvent;
import org.springframework.lang.NonNull;
import org.springframework.util.Assert;
/** /**
* Visit event. * Post visit event.
* *
* @author johnniang * @author johnniang
* @date 19-4-22 * @date 19-4-22
*/ */
public class PostVisitEvent extends ApplicationEvent { public class PostVisitEvent extends AbstractVisitEvent {
private final Integer postId;
/** /**
* Create a new ApplicationEvent. * Create a new ApplicationEvent.
* *
* @param source the object on which the event initially occurred (never {@code null}) * @param source the object on which the event initially occurred (never {@code null})
* @param postId post id * @param postId post id must not be null
*/ */
public PostVisitEvent(@NonNull Object source, @NonNull Integer postId) { public PostVisitEvent(Object source, Integer postId) {
super(source); super(source, postId);
Assert.notNull(postId, "Post id must not be null");
this.postId = postId;
}
@NonNull
public Integer getPostId() {
return postId;
} }
} }

View File

@ -6,10 +6,6 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import run.halo.app.service.PostService; import run.halo.app.service.PostService;
import javax.annotation.PreDestroy;
import java.util.Map;
import java.util.concurrent.*;
/** /**
* Visit event listener. * Visit event listener.
* *
@ -18,103 +14,15 @@ import java.util.concurrent.*;
*/ */
@Slf4j @Slf4j
@Component @Component
public class PostVisitEventListener { public class PostVisitEventListener extends AbstractVisitEventListener {
private final Map<Integer, BlockingQueue<Integer>> postVisitQueueMap;
private final Map<Integer, PostVisitTask> postVisitTaskMap;
private final PostService postService;
private final ExecutorService executor;
public PostVisitEventListener(PostService postService) { public PostVisitEventListener(PostService postService) {
this.postService = postService; super(postService);
int initCapacity = 8;
long count = postService.count();
if (count < initCapacity) {
initCapacity = (int) count;
}
postVisitQueueMap = new ConcurrentHashMap<>(initCapacity << 1);
postVisitTaskMap = new ConcurrentHashMap<>(initCapacity << 1);
this.executor = Executors.newCachedThreadPool();
} }
@Async @Async
@EventListener @EventListener
public void onApplicationEvent(PostVisitEvent event) throws InterruptedException { public void onPostVisitEvent(PostVisitEvent event) throws InterruptedException {
// Get post id handleVisitEvent(event);
Integer postId = event.getPostId();
log.debug("Received a visit event, post id: [{}]", postId);
// Get post visit queue
BlockingQueue<Integer> postVisitQueue = postVisitQueueMap.computeIfAbsent(postId, this::createEmptyQueue);
postVisitTaskMap.computeIfAbsent(postId, this::createPostVisitTask);
// Put a visit for the post
postVisitQueue.put(postId);
// TODO Attempt to manage the post visit tasks
}
private PostVisitTask createPostVisitTask(Integer postId) {
// Create new post visit task
PostVisitTask postVisitTask = new PostVisitTask(postId);
// Start a post visit task
executor.execute(postVisitTask);
log.debug("Created a new post visit task for post id: [{}]", postId);
return postVisitTask;
}
@PreDestroy
protected void preDestroy() {
executor.shutdownNow();
}
private BlockingQueue<Integer> createEmptyQueue(Integer postId) {
// Create a new queue
return new LinkedBlockingQueue<>();
}
/**
* Post visit task.
*/
private class PostVisitTask implements Runnable {
private final Integer postId;
private PostVisitTask(Integer postId) {
this.postId = postId;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
BlockingQueue<Integer> postVisitQueue = postVisitQueueMap.get(postId);
Integer postId = postVisitQueue.take();
log.debug("Took a new visit for post id: [{}]", postId);
// Increase the visit
postService.increaseVisit(postId);
log.debug("Increased visits for post id: [{}]", postId);
} catch (InterruptedException e) {
log.debug("Post visit task: " + Thread.currentThread().getName() + " was interrupted", e);
// Ignore this exception
}
}
log.debug("Thread: [{}] has been interrupted", Thread.currentThread().getName());
}
} }
} }

View File

@ -0,0 +1,20 @@
package run.halo.app.event.post;
/**
* Sheet visit event.
*
* @author johnniang
* @date 19-4-24
*/
public class SheetVisitEvent extends AbstractVisitEvent {
/**
* Create a new ApplicationEvent.
*
* @param source the object on which the event initially occurred (never {@code null})
* @param sheetId sheet id must not be null
*/
public SheetVisitEvent(Object source, Integer sheetId) {
super(source, sheetId);
}
}

View File

@ -0,0 +1,26 @@
package run.halo.app.event.post;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import run.halo.app.service.SheetService;
/**
* Sheet visit event listener.
*
* @author johnniang
* @date 19-4-24
*/
public class SheetVisitEventListener extends AbstractVisitEventListener {
protected SheetVisitEventListener(SheetService sheetService) {
super(sheetService);
}
@Async
@EventListener
public void onSheetVisitEvent(SheetVisitEvent event) throws InterruptedException {
handleVisitEvent(event);
}
}

View File

@ -5,6 +5,7 @@ import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import run.halo.app.event.post.SheetVisitEvent;
import run.halo.app.model.dto.post.SheetDetailDTO; import run.halo.app.model.dto.post.SheetDetailDTO;
import run.halo.app.model.dto.post.SheetListDTO; import run.halo.app.model.dto.post.SheetListDTO;
import run.halo.app.model.entity.Sheet; import run.halo.app.model.entity.Sheet;
@ -72,8 +73,7 @@ public class SheetServiceImpl extends BasePostServiceImpl<Sheet> implements Shee
if (PostStatus.PUBLISHED.equals(status)) { if (PostStatus.PUBLISHED.equals(status)) {
// Log it // Log it
// TODO Fatal bug here eventPublisher.publishEvent(new SheetVisitEvent(this, sheet.getId()));
// eventPublisher.publishEvent(new PostVisitEvent(this, sheet.getId()));
} }
return sheet; return sheet;